You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2019/10/03 09:37:24 UTC
[nifi] branch master updated: NIFI-6159 - Add BigQuery processor
using the Streaming API
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new b12a9ad NIFI-6159 - Add BigQuery processor using the Streaming API
b12a9ad is described below
commit b12a9ad446773f9f043dfef10327691bf3963a07
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Wed Mar 27 16:52:47 2019 +0100
NIFI-6159 - Add BigQuery processor using the Streaming API
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #3394.
---
.../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 20 ++
.../gcp/bigquery/AbstractBigQueryProcessor.java | 73 ++++----
.../gcp/bigquery/BigQueryAttributes.java | 15 +-
.../processors/gcp/bigquery/PutBigQueryBatch.java | 120 ++++++------
.../gcp/bigquery/PutBigQueryStreaming.java | 201 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 3 +-
.../gcp/bigquery/AbstractBigQueryIT.java | 52 ++++--
.../gcp/bigquery/PutBigQueryBatchIT.java | 22 ++-
.../gcp/bigquery/PutBigQueryStreamingIT.java | 184 +++++++++++++++++++
.../resources/bigquery/streaming-bad-data.json | 35 ++++
.../resources/bigquery/streaming-correct-data.json | 36 ++++
nifi-nar-bundles/nifi-gcp-bundle/pom.xml | 2 +-
12 files changed, 628 insertions(+), 135 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 5d21ef3..2455891 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -40,6 +40,18 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
@@ -88,6 +100,12 @@
<version>1.10.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -100,6 +118,8 @@
<exclude>src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker</exclude>
<exclude>src/test/resources/mock-gcp-service-account.json</exclude>
<exclude>src/test/resources/mock-gcp-application-default-credentials.json</exclude>
+ <exclude>src/test/resources/bigquery/streaming-bad-data.json</exclude>
+ <exclude>src/test/resources/bigquery/streaming-correct-data.json</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index 3751060..b2dc43a 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,13 @@
package org.apache.nifi.processors.gcp.bigquery;
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryOptions;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -35,13 +37,11 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.util.StringUtils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.common.collect.ImmutableList;
/**
* Base class for creating processors that connect to GCP BiqQuery service
@@ -50,14 +50,12 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
static final int BUFFER_SIZE = 65536;
- public static final Relationship REL_SUCCESS =
- new Relationship.Builder().name("success")
- .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
- .build();
- public static final Relationship REL_FAILURE =
- new Relationship.Builder().name("failure")
- .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
- .build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+ .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
+ .build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@@ -82,23 +80,14 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
- public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
- .displayName("Table Schema")
- .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
- .required(false)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
- .displayName("Read Timeout")
- .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
+ public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
+ .displayName("Ignore Unknown Values")
+ .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
.required(true)
- .defaultValue("5 minutes")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("false")
.build();
@Override
@@ -108,12 +97,11 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return ImmutableList.<PropertyDescriptor>builder()
+ return ImmutableList.<PropertyDescriptor> builder()
.addAll(super.getSupportedPropertyDescriptors())
.add(DATASET)
.add(TABLE_NAME)
- .add(TABLE_SCHEMA)
- .add(READ_TIMEOUT)
+ .add(IGNORE_UNKNOWN)
.build();
}
@@ -153,7 +141,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
}
/**
- * If sub-classes needs to implement any custom validation, override this method then add validation result to the results.
+ * If sub-classes needs to implement any custom validation, override this method then add
+ * validation result to the results.
*/
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
index 81978eb..4a379c0 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,7 +27,8 @@ import com.google.cloud.bigquery.JobInfo;
* Attributes associated with the BigQuery processors
*/
public class BigQueryAttributes {
- private BigQueryAttributes() {}
+ private BigQueryAttributes() {
+ }
public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
@@ -102,7 +103,12 @@ public class BigQueryAttributes {
+ "can interpret logical types into their corresponding types (such as TIMESTAMP) instead of only using their raw "
+ "types (such as INTEGER).";
+ public static final String RECORD_READER_ATTR = "bq.record.reader";
+ public static final String RECORD_READER_DESC = "Specifies the Controller Service to use for parsing incoming data.";
+ public static final String SKIP_INVALID_ROWS_ATTR = "bq.skip.invalid.rows";
+ public static final String SKIP_INVALID_ROWS_DESC = "Sets whether to insert all valid rows of a request, even if invalid "
+ + "rows exist. If not set the entire insert request will fail if it contains an invalid row.";
// Batch Attributes
public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
@@ -129,7 +135,6 @@ public class BigQueryAttributes {
public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
-
// Allowable values
public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
index 5446c20..cd3cb09 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,16 +17,14 @@
package org.apache.nifi.processors.gcp.bigquery;
-import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.FormatOptions;
-import com.google.cloud.bigquery.Job;
-import com.google.cloud.bigquery.JobInfo;
-import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
-import com.google.cloud.bigquery.Schema;
-import com.google.cloud.bigquery.TableDataWriteChannel;
-import com.google.cloud.bigquery.TableId;
-import com.google.cloud.bigquery.WriteChannelConfiguration;
-import com.google.common.collect.ImmutableList;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -52,39 +50,33 @@ import org.apache.nifi.util.StringUtils;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import com.google.common.collect.ImmutableList;
/**
* A processor for batch loading data into a Google BigQuery table
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@Tags({"google", "google cloud", "bq", "bigquery"})
+@Tags({ "google", "google cloud", "bq", "bigquery" })
@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
-@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
+@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
@WritesAttributes({
- @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
- @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
@@ -99,7 +91,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
return builder.valid(true).explanation("Contains Expression Language").build();
}
- if(TYPES.contains(input.toUpperCase())) {
+ if (TYPES.contains(input.toUpperCase())) {
builder.valid(true);
} else {
builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
@@ -109,23 +101,31 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
}
};
- public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
- .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
- .displayName("Load file type")
- .description(BigQueryAttributes.SOURCE_TYPE_DESC)
+ public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
+ .displayName("Read Timeout")
+ .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
.required(true)
- .addValidator(FORMAT_VALIDATOR)
+ .defaultValue("5 minutes")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
+ .displayName("Table Schema")
+ .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
- .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
- .displayName("Ignore Unknown Values")
- .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
+ public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
+ .displayName("Load file type")
+ .description(BigQueryAttributes.SOURCE_TYPE_DESC)
.required(true)
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .allowableValues("true", "false")
- .defaultValue("false")
+ .addValidator(FORMAT_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
@@ -225,13 +225,14 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return ImmutableList.<PropertyDescriptor>builder()
+ return ImmutableList.<PropertyDescriptor> builder()
.addAll(super.getSupportedPropertyDescriptors())
+ .add(TABLE_SCHEMA)
+ .add(READ_TIMEOUT)
.add(SOURCE_TYPE)
.add(CREATE_DISPOSITION)
.add(WRITE_DISPOSITION)
.add(MAXBAD_RECORDS)
- .add(IGNORE_UNKNOWN)
.add(CSV_ALLOW_JAGGED_ROWS)
.add(CSV_ALLOW_QUOTED_NEW_LINES)
.add(CSV_CHARSET)
@@ -271,7 +272,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
FormatOptions formatOption;
- if(type.equals(FormatOptions.csv().getType())) {
+ if (type.equals(FormatOptions.csv().getType())) {
formatOption = FormatOptions.csv().toBuilder()
.setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
.setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
@@ -285,18 +286,17 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
}
final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
- final WriteChannelConfiguration writeChannelConfiguration =
- WriteChannelConfiguration.newBuilder(tableId)
+ final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId)
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
- .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+ .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean())
.setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean())
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
.setSchema(schema)
.setFormatOptions(formatOption)
.build();
- try ( TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration) ) {
+ try (TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration)) {
session.read(flowFile, rawIn -> {
ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
@@ -337,7 +337,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
// add the number of records successfully added
- if(job.getStatistics() instanceof LoadStatistics) {
+ if (job.getStatistics() instanceof LoadStatistics) {
final LoadStatistics stats = (LoadStatistics) job.getStatistics();
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
new file mode 100644
index 0000000..98457a3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.StringUtils;
+
+import com.google.cloud.bigquery.BigQueryError;
+import com.google.cloud.bigquery.InsertAllRequest;
+import com.google.cloud.bigquery.InsertAllResponse;
+import com.google.cloud.bigquery.TableId;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery
+ * streaming insert API to insert data. This provides the lowest-latency insert path into BigQuery,
+ * and therefore is the default method when the input is unbounded. BigQuery will make a strong
+ * effort to ensure no duplicates when using this path, however there are some scenarios in which
+ * BigQuery is unable to make this guarantee (see
+ * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over the
+ * output table to periodically clean these rare duplicates. Alternatively, using the Batch insert
+ * method does guarantee no duplicates, though the latency for the insert into BigQuery will be much
+ * higher.
+ */
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" })
+@CapabilityDescription("Load data into Google BigQuery table using the streaming API. This processor "
+ + "is not intended to load large flow files as it will load the full content into memory. If "
+ + "you need to insert large flow files, consider using PutBigQueryBatch instead.")
+@SeeAlso({ PutBigQueryBatch.class })
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
+@WritesAttributes({
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
+})
+public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
+
+ public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.RECORD_READER_ATTR)
+ .displayName("Record Reader")
+ .description(BigQueryAttributes.RECORD_READER_DESC)
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
+ .displayName("Skip Invalid Rows")
+ .description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("false")
+ .build();
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return ImmutableList.<PropertyDescriptor> builder()
+ .addAll(super.getSupportedPropertyDescriptors())
+ .add(RECORD_READER)
+ .add(SKIP_INVALID_ROWS)
+ .build();
+ }
+
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ super.onScheduled(context);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+ final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+ final TableId tableId;
+ if (StringUtils.isEmpty(projectId)) {
+ tableId = TableId.of(dataset, tableName);
+ } else {
+ tableId = TableId.of(projectId, dataset, tableName);
+ }
+
+ try {
+
+ InsertAllRequest.Builder request = InsertAllRequest.newBuilder(tableId);
+ int nbrecord = 0;
+
+ try (final InputStream in = session.read(flowFile)) {
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());) {
+ Record currentRecord;
+ while ((currentRecord = reader.nextRecord()) != null) {
+ request.addRow(convertMapRecord(currentRecord.toMap()));
+ nbrecord++;
+ }
+ }
+ }
+
+ request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean());
+ request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean());
+
+ InsertAllResponse response = getCloudService().insertAll(request.build());
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ if (response.hasErrors()) {
+ getLogger().log(LogLevel.WARN, "Failed to insert {} of {} records into BigQuery {} table.", new Object[] { response.getInsertErrors().size(), nbrecord, tableName });
+ if (getLogger().isDebugEnabled()) {
+ for (long index : response.getInsertErrors().keySet()) {
+ for (BigQueryError e : response.getInsertErrors().get(index)) {
+ getLogger().log(LogLevel.DEBUG, "Failed to insert record #{}: {}", new Object[] { index, e.getMessage() });
+ }
+ }
+ }
+
+ attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord - response.getInsertErrors().size()));
+
+ flowFile = session.penalize(flowFile);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord));
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
+ } catch (Exception ex) {
+ getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ private Map<String, Object> convertMapRecord(Map<String, Object> map) {
+ Map<String, Object> result = new HashMap<String, Object>();
+ for (String key : map.keySet()) {
+ Object obj = map.get(key);
+ if (obj instanceof MapRecord) {
+ result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
+ } else if (obj instanceof Object[]
+ && ((Object[]) obj).length > 0
+ && ((Object[]) obj)[0] instanceof MapRecord) {
+ List<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
+ for (Object mapr : ((Object[]) obj)) {
+ lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
+ }
+ result.put(key, lmapr);
+ } else {
+ result.put(key, obj);
+ }
+ }
+ return result;
+ }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 4ff9dfb..9d26958 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,4 +18,5 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject
org.apache.nifi.processors.gcp.storage.ListGCSBucket
org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
-org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
\ No newline at end of file
+org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
+org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
index 52327d4..cba730f 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,18 @@
package org.apache.nifi.processors.gcp.bigquery;
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryOptions;
-import com.google.cloud.bigquery.Dataset;
-import com.google.cloud.bigquery.DatasetInfo;
-import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processors.gcp.GCPIntegrationTests;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
@@ -31,24 +36,37 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertNull;
+import com.google.auth.Credentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.Dataset;
+import com.google.cloud.bigquery.DatasetInfo;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
@Category(GCPIntegrationTests.class)
public abstract class AbstractBigQueryIT {
- static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+ protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+ protected static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi");
+ protected static final String SERVICE_ACCOUNT_JSON = System.getProperty("test.gcp.service.account", "/path/to/service/account.json");
+
protected static BigQuery bigquery;
protected static Dataset dataset;
protected static TestRunner runner;
+ private static final CredentialsFactory credentialsProviderFactory = new CredentialsFactory();
+
@BeforeClass
- public static void beforeClass() {
- dataset = null;
+ public static void beforeClass() throws IOException {
+ final Map<PropertyDescriptor, String> propertiesMap = new HashMap<>();
+ propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, SERVICE_ACCOUNT_JSON);
+ Credentials credentials = credentialsProviderFactory.getGoogleCredentials(propertiesMap, new ProxyAwareTransportFactory(null));
+
BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setCredentials(credentials)
.build();
+
bigquery = bigQueryOptions.getService();
DatasetInfo datasetInfo = DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build();
@@ -67,9 +85,11 @@ public abstract class AbstractBigQueryIT {
}
protected TestRunner setCredentialsControllerService(TestRunner runner) throws InitializationException {
- final Map<String, String> propertiesMap = new HashMap<>();
final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService();
+ final Map<String, String> propertiesMap = new HashMap<>();
+ propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE.getName(), SERVICE_ACCOUNT_JSON);
+
runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap);
runner.enableControllerService(credentialsControllerService);
runner.assertValid(credentialsControllerService);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
index 8686213..bd56340 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +17,13 @@
package org.apache.nifi.processors.gcp.bigquery;
-import com.google.cloud.bigquery.FormatOptions;
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@@ -25,12 +31,7 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
+import com.google.cloud.bigquery.FormatOptions;
public class PutBigQueryBatchIT extends AbstractBigQueryIT {
@@ -58,6 +59,7 @@ public class PutBigQueryBatchIT extends AbstractBigQueryIT {
@Before
public void setup() {
runner = TestRunners.newTestRunner(PutBigQueryBatch.class);
+ runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID);
}
@Test
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
new file mode 100644
index 0000000..6bed8be
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.file.Paths;
+import java.util.Iterator;
+
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.Field.Mode;
+import com.google.cloud.bigquery.FieldValueList;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardTableDefinition;
+import com.google.cloud.bigquery.TableDefinition;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.TableResult;
+
+public class PutBigQueryStreamingIT extends AbstractBigQueryIT {
+
+ private Schema schema;
+
+ @Before
+ public void setup() throws InitializationException {
+ runner = TestRunners.newTestRunner(PutBigQueryStreaming.class);
+ runner = setCredentialsControllerService(runner);
+ runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
+ runner.setProperty(AbstractGCPProcessor.PROJECT_ID, PROJECT_ID);
+ }
+
+ private void createTable(String tableName) {
+ TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
+
+ // Table field definition
+ Field id = Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Mode.REQUIRED).build();
+ Field name = Field.newBuilder("name", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+ Field alias = Field.newBuilder("alias", LegacySQLTypeName.STRING).setMode(Mode.REPEATED).build();
+
+ Field zip = Field.newBuilder("zip", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+ Field city = Field.newBuilder("city", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+ Field addresses = Field.newBuilder("addresses", LegacySQLTypeName.RECORD, zip, city).setMode(Mode.REPEATED).build();
+
+ Field position = Field.newBuilder("position", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+ Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
+ Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build();
+
+ // Table schema definition
+ schema = Schema.of(id, name, alias, addresses, job);
+ TableDefinition tableDefinition = StandardTableDefinition.of(schema);
+ TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
+
+ // create table
+ bigquery.create(tableInfo);
+ }
+
+ private void deleteTable(String tableName) {
+ TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
+ bigquery.delete(tableId);
+ }
+
+ @Test
+ public void PutBigQueryStreamingNoError() throws Exception {
+ String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ createTable(tableName);
+
+ runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+ runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+ runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data.json"));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
+ runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
+
+ TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+ Iterator<FieldValueList> iterator = result.getValues().iterator();
+
+ FieldValueList firstElt = iterator.next();
+ FieldValueList sndElt = iterator.next();
+ assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
+ assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
+
+ FieldValueList john;
+ FieldValueList jane;
+ john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
+ jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
+
+ assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
+ assertTrue(john.get("alias").getRepeatedValue().size() == 2);
+ assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
+
+ deleteTable(tableName);
+ }
+
+ @Test
+ public void PutBigQueryStreamingFullError() throws Exception {
+ String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ createTable(tableName);
+
+ runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+ runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+
+ runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
+ runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "0");
+
+ TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+ assertFalse(result.getValues().iterator().hasNext());
+
+ deleteTable(tableName);
+ }
+
+ @Test
+ public void PutBigQueryStreamingPartialError() throws Exception {
+ String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
+ createTable(tableName);
+
+ runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
+ runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+ runner.enableControllerService(jsonReader);
+
+ runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
+ runner.setProperty(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR, "true");
+
+ runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
+ runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "1");
+
+ TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
+ Iterator<FieldValueList> iterator = result.getValues().iterator();
+
+ FieldValueList firstElt = iterator.next();
+ assertFalse(iterator.hasNext());
+ assertEquals(firstElt.get("name").getStringValue(), "Jane Doe");
+
+ deleteTable(tableName);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json
new file mode 100644
index 0000000..1a7ebba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json
@@ -0,0 +1,35 @@
+[
+ {
+ "name": "John Doe",
+ "alias": ["john", "jd"],
+ "addresses": [
+ {
+ "zip": "1000",
+ "city": "NiFi"
+ },
+ {
+ "zip": "2000",
+ "city": "Bar"
+ }
+ ],
+ "job": {
+ "position": "Manager",
+ "company": "ASF"
+ }
+ },
+ {
+ "id": 2,
+ "name": "Jane Doe",
+ "alias": ["jane"],
+ "addresses": [
+ {
+ "zip": "1000",
+ "city": "NiFi"
+ }
+ ],
+ "job": {
+ "position": "Director",
+ "company": "ASF"
+ }
+ }
+ ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json
new file mode 100644
index 0000000..994037f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json
@@ -0,0 +1,36 @@
+[
+ {
+ "id": 1,
+ "name": "John Doe",
+ "alias": ["john", "jd"],
+ "addresses": [
+ {
+ "zip": "1000",
+ "city": "NiFi"
+ },
+ {
+ "zip": "2000",
+ "city": "Bar"
+ }
+ ],
+ "job": {
+ "position": "Manager",
+ "company": "ASF"
+ }
+ },
+ {
+ "id": 2,
+ "name": "Jane Doe",
+ "alias": [],
+ "addresses": [
+ {
+ "zip": "1000",
+ "city": "NiFi"
+ }
+ ],
+ "job": {
+ "position": "Director",
+ "company": "ASF"
+ }
+ }
+ ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
index 2e8a65a..2f88a31 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
@@ -27,7 +27,7 @@
<packaging>pom</packaging>
<properties>
- <google.cloud.sdk.version>0.101.0-alpha</google.cloud.sdk.version>
+ <google.cloud.sdk.version>0.107.0-alpha</google.cloud.sdk.version>
</properties>
<dependencyManagement>