You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by danieljimenez <gi...@git.apache.org> on 2018/05/07 12:12:45 UTC

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

GitHub user danieljimenez opened a pull request:

    https://github.com/apache/nifi/pull/2682

    NIFI-4731: BQ Processors and GCP library update.

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ X] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ X] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [X ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [X ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [X ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ X] Have you written or updated unit tests to verify your changes?
    - [NA] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [NA] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [NA] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-ssembly?
    - [NA] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [X] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/danieljimenez/nifi bigquery-1.7

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2682.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2682
    
----
commit fc372089a16eec95533d9eb8dfeb17b4822b9c4b
Author: Michal Misiewicz <mi...@...>
Date:   2018-04-29T08:01:34Z

    NIFI-4731: BQ Processors and GCP library update.

----


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r212762943
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    --- End diff --
    
    not necessary


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r212762622
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
    +
    +/**
    + * Attributes associated with the BigQuery processors
    + */
    +public class BigQueryAttributes {
    +    private BigQueryAttributes() {}
    +
    +    public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
    +
    +    public static final String DATASET_ATTR = "bq.dataset";
    +    public static final String DATASET_DESC = "BigQuery dataset";
    +
    +    public static final String TABLE_NAME_ATTR = "bq.table.name";
    +    public static final String TABLE_NAME_DESC = "BigQuery table name";
    +
    +    public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
    +    public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
    +
    +    public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
    +    public static final String CREATE_DISPOSITION_DESC = "Options for table creation";
    +
    +    public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
    +    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
    +
    +    public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
    +    public static final String JOB_ERROR_REASON_DESC = "Load jon error reason";
    --- End diff --
    
    typo


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r212762648
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
    +
    +/**
    + * Attributes associated with the BigQuery processors
    + */
    +public class BigQueryAttributes {
    +    private BigQueryAttributes() {}
    +
    +    public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
    +
    +    public static final String DATASET_ATTR = "bq.dataset";
    +    public static final String DATASET_DESC = "BigQuery dataset";
    +
    +    public static final String TABLE_NAME_ATTR = "bq.table.name";
    +    public static final String TABLE_NAME_DESC = "BigQuery table name";
    +
    +    public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
    +    public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
    +
    +    public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
    +    public static final String CREATE_DISPOSITION_DESC = "Options for table creation";
    +
    +    public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
    +    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
    +
    +    public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
    +    public static final String JOB_ERROR_REASON_DESC = "Load jon error reason";
    +
    +    public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
    +    public static final String JOB_ERROR_LOCATION_DESC = "Load jon error location";
    --- End diff --
    
    typo


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    Hey @danieljimenez - sorry it took so long... I left comments on the PR while playing with the processor on my side (+ a rebase is needed with the recent proxy PR being merged). Once all the comments are addressed, I believe we can quickly be in a position to merge this in.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216713400
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        super.onScheduled(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final BigQuery bq = getCloudService();
    +
    +        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
    +        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final TableId tableId;
    +        if (StringUtils.isEmpty(projectId)) {
    +            tableId = TableId.of(dataset, tableName);
    +        } else {
    +            tableId = TableId.of(projectId, dataset, tableName);
    +        }
    +
    +        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
    +
    +        String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
    +        Schema schema = BqUtils.schemaFromString(schemaString);
    +
    +        WriteChannelConfiguration writeChannelConfiguration =
    +                WriteChannelConfiguration.newBuilder(tableId)
    +                        .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
    +                        .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
    +                        .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
    +                        .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
    +                        .setSchema(schema)
    +                        .setFormatOptions(FormatOptions.of(fileType))
    +                        .build();
    +
    +        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
    +
    +        try {
    +            session.read(flowFile, rawIn -> {
    +                ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
    +                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    +                while (readableByteChannel.read(byteBuffer) >= 0) {
    +                    byteBuffer.flip();
    +                    writer.write(byteBuffer);
    +                    byteBuffer.clear();
    +                }
    +            });
    +
    +            writer.close();
    +
    +            Job job = writer.getJob();
    +            PropertyValue property = context.getProperty(READ_TIMEOUT);
    +            Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
    +            Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
    +            job = job.waitFor(RetryOption.totalTimeout(duration));
    +
    +            if (job != null) {
    +                attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
    +                attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
    +                attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
    --- End diff --
    
    In case of job success, could we add the number of records we added in the table, using:
    ````java
    LoadStatistics stats = (LoadStatistics) job.getStatistics();
    stats.getOutputRows();
    ````


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    Plus I have no way to test.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216692916
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        super.onScheduled(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final BigQuery bq = getCloudService();
    +
    +        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
    +        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final TableId tableId;
    +        if (StringUtils.isEmpty(projectId)) {
    +            tableId = TableId.of(dataset, tableName);
    +        } else {
    +            tableId = TableId.of(projectId, dataset, tableName);
    +        }
    +
    +        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
    +
    +        String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
    +        Schema schema = BqUtils.schemaFromString(schemaString);
    --- End diff --
    
    This can throw exception that should be caught to yield the processor.


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    Hi @danieljimenez - if you are not in a position to work on it, let me know, I'm happy to take your current PR and just submit a commit on top of it. I'd really like to have this processor included in the next release.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r188031584
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml ---
    @@ -80,6 +85,16 @@
                     </exclusion>
                 </exclusions>
             </dependency>
    +        <dependency>
    +            <groupId>com.google.cloud</groupId>
    +            <artifactId>google-cloud-storage</artifactId>
    +            <version>1.25.0</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.google.cloud</groupId>
    +            <artifactId>google-cloud-bigquery</artifactId>
    +            <version>1.25.0</version>
    --- End diff --
    
    @danieljimenez Please rebase it and make the necessary changes (on dependency) that @pvillard31 suggested. 


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r186428592
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java ---
    @@ -36,14 +36,14 @@
      */
     public abstract class AbstractGCPProcessor<
             CloudService extends Service<CloudServiceOptions>,
    -        CloudServiceRpc,
    -        CloudServiceOptions extends HttpServiceOptions<CloudService, CloudServiceRpc, CloudServiceOptions>> extends AbstractProcessor {
    +        CloudServiceOptions extends ServiceOptions<CloudService, CloudServiceOptions>> extends AbstractProcessor {
     
         public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor
                 .Builder().name("gcp-project-id")
                 .displayName("Project ID")
                 .description("Google Cloud Project ID")
                 .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    --- End diff --
    
    I added variable registry support to the GCP project ID.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r188032966
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStream.java ---
    @@ -0,0 +1,360 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.BigQueryError;
    +import com.google.cloud.bigquery.InsertAllRequest;
    +import com.google.cloud.bigquery.InsertAllResponse;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.StandardTableDefinition;
    +import com.google.cloud.bigquery.Table;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.TableInfo;
    +import com.google.common.collect.ImmutableList;
    +import com.google.gson.Gson;
    +import com.google.gson.reflect.TypeToken;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.lang.reflect.Type;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +/**
    + *
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Streams flow files to a Google BigQuery table.")
    +
    +@WritesAttributes({
    +    @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.BATCH_SIZE_ATTR, description = BigQueryAttributes.BATCH_SIZE_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.MAX_ROW_SIZE_ATTR, description = BigQueryAttributes.MAX_ROW_SIZE_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.TABLE_CACHE_RESET_ATTR, description = BigQueryAttributes.TABLE_CACHE_RESET_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryStream extends AbstractBigQueryProcessor {
    +    public static final Relationship REL_ROW_TOO_BIG =
    +        new Relationship.Builder().name("row_too_big")
    +                .description("FlowFiles are routed to this relationship if the row size is too big.")
    +                .build();
    +
    +    public static final PropertyDescriptor DATASET = new PropertyDescriptor
    +        .Builder().name(BigQueryAttributes.DATASET_ATTR)
    +        .displayName("Dataset")
    +        .description(BigQueryAttributes.DATASET_DESC)
    +        .required(true)
    +        .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}")
    +        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +        .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
    +        .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
    +        .displayName("Table Name")
    +        .description(BigQueryAttributes.TABLE_NAME_DESC)
    +        .required(true)
    +        .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}")
    +        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +        .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +        .build();
    +
    +    public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor
    --- End diff --
    
    I think `TABLE_SCHEMA`, `DATASET`, `TABLE_NAME` can be moved to the `AbstractBigQueryProcessor`


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    @danieljimenez 
    Based on
    ````
    [WARNING] Files with unapproved licenses:
      /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
      /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
    ````
    The two files are missing the headers with Apache license at the top. You can just copy/paste the header from another file.


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    Is there anything I can do to ensure this makes it to 1.8? Thanks!


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r188031876
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +/**
    + * Attributes associated with the BigQuery processors
    + */
    +public class BigQueryAttributes {
    +    private BigQueryAttributes() {}
    +
    +    public static final String DATASET_ATTR = "bq.dataset";
    --- End diff --
    
    Prefer if we could prefix it with `gcp`. Example: `gcp.bq.dataset`. Thoughts?


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216690423
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    --- End diff --
    
    .expressionLanguageSupported(boolean) is deprecated. Can you please specify the scope if dynamic properties are actually used somewhere?


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by jugi92 <gi...@git.apache.org>.
Github user jugi92 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r214683439
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java ---
    @@ -43,7 +43,8 @@
                 .Builder().name("gcp-project-id")
                 .displayName("Project ID")
                 .description("Google Cloud Project ID")
    -            .required(true)
    --- End diff --
    
    For example to download public buckets like this one:
    https://console.cloud.google.com/storage/browser/gcp-public-data-landsat/?_ga=2.37550372.-565124473.1518597165
    
    I created an issue for that a while ago: 
    https://issues.apache.org/jira/browse/NIFI-4933


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216697497
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        super.onScheduled(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final BigQuery bq = getCloudService();
    +
    +        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
    +        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final TableId tableId;
    +        if (StringUtils.isEmpty(projectId)) {
    +            tableId = TableId.of(dataset, tableName);
    +        } else {
    +            tableId = TableId.of(projectId, dataset, tableName);
    +        }
    +
    +        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
    +
    +        String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
    +        Schema schema = BqUtils.schemaFromString(schemaString);
    +
    +        WriteChannelConfiguration writeChannelConfiguration =
    +                WriteChannelConfiguration.newBuilder(tableId)
    +                        .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
    +                        .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
    +                        .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
    +                        .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
    +                        .setSchema(schema)
    +                        .setFormatOptions(FormatOptions.of(fileType))
    +                        .build();
    +
    +        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
    --- End diff --
    
    This should also be in the try/catch as it can raise exception which you should cause the processor to yield. Example:
    ````
    com.google.cloud.bigquery.BigQueryException: 404 Not Found
    Not Found
    com.google.cloud.bigquery.BigQueryException: 404 Not Found
    Not Found
            at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:99)
            at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.open(HttpBigQueryRpc.java:419)
            at com.google.cloud.bigquery.TableDataWriteChannel$2.call(TableDataWriteChannel.java:82)
            at com.google.cloud.bigquery.TableDataWriteChannel$2.call(TableDataWriteChannel.java:77)
            at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:89)
            at com.google.cloud.RetryHelper.run(RetryHelper.java:74)
            at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:51)
            at com.google.cloud.bigquery.TableDataWriteChannel.open(TableDataWriteChannel.java:76)
            at com.google.cloud.bigquery.TableDataWriteChannel.<init>(TableDataWriteChannel.java:41)
            at com.google.cloud.bigquery.BigQueryImpl.writer(BigQueryImpl.java:729)
            at com.google.cloud.bigquery.BigQueryImpl.writer(BigQueryImpl.java:720)
            at org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch.onTrigger(PutBigQueryBatch.java:211)
            at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
            at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
            at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
            at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: com.google.api.client.http.HttpResponseException: 404 Not Found
    Not Found
            at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1070)
            at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.open(HttpBigQueryRpc.java:416)
            ... 21 common frames omitted
    ````
    ````


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r212763182
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    --- End diff --
    
    are dynamic properties used somewhere?


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r212763892
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml ---
    @@ -33,7 +33,7 @@
             <dependency>
                 <groupId>com.google.auth</groupId>
                 <artifactId>google-auth-library-oauth2-http</artifactId>
    -            <version>0.6.0</version>
    +            <version>0.9.0</version>
    --- End diff --
    
    0.11.0 is available


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216694142
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
    +
    +/**
    + * Attributes associated with the BigQuery processors
    + */
    +public class BigQueryAttributes {
    +    private BigQueryAttributes() {}
    +
    +    public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
    +
    +    public static final String DATASET_ATTR = "bq.dataset";
    +    public static final String DATASET_DESC = "BigQuery dataset";
    +
    +    public static final String TABLE_NAME_ATTR = "bq.table.name";
    +    public static final String TABLE_NAME_DESC = "BigQuery table name";
    +
    +    public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
    +    public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
    --- End diff --
    
    copy/paste error.
    besides the description should specify that the schema must be provided as JSON data. At first I tried to specify the schema with ``name:STRING,age:INTEGER,email:STRING`` which is also an option in the BigQuery UI.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216699464
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
    +
    +/**
    + * Attributes associated with the BigQuery processors
    + */
    +public class BigQueryAttributes {
    +    private BigQueryAttributes() {}
    +
    +    public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
    +
    +    public static final String DATASET_ATTR = "bq.dataset";
    +    public static final String DATASET_DESC = "BigQuery dataset";
    --- End diff --
    
    Can we add to the description that the dataset must exist before trying to add data into it (new table or not)?


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r204721738
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +/**
    + * Attributes associated with the BigQuery processors
    + */
    +public class BigQueryAttributes {
    +    private BigQueryAttributes() {}
    +
    +    public static final String DATASET_ATTR = "bq.dataset";
    --- End diff --
    
    @zenfenan After looking at this, that doesn't seem the match the defaults used by the other gcp processors. i.e. `gcs.bucket`.


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by jugi92 <gi...@git.apache.org>.
Github user jugi92 commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    @danieljimenez Could you maybe add the authenticated http proxy support changes to this PR that I committed here: https://github.com/apache/nifi/pull/2943/commits/023e0cd3c512f278fb2bd3135843b7c26231fb46


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    Hey @pvillard31, I'm 100% okay with that. I'll even add you to my fork if you want. I wont be able to circle back to this for a couple of weeks at least.
    
    Thanks for all your help.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r206191230
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,263 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class, PutBigQueryStream.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    public static final int BUFFER_SIZE = 16384;
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        super.onScheduled(context);
    +
    +        if (schemaCache == null) {
    +            String schemaStr = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
    +            schemaCache = BqUtils.schemaFromString(schemaStr);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final BigQuery bq = getCloudService();
    +
    +        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
    +        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final TableId tableId;
    +        if (StringUtils.isEmpty(projectId)) {
    +            tableId = TableId.of(dataset, tableName);
    +        } else {
    +            tableId = TableId.of(projectId, dataset, tableName);
    +        }
    +
    +        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
    +
    +        WriteChannelConfiguration writeChannelConfiguration =
    +                WriteChannelConfiguration.newBuilder(tableId)
    +                        .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
    +                        .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
    +                        .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
    +                        .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
    +                        .setSchema(schemaCache)
    +                        .setFormatOptions(FormatOptions.of(fileType))
    +                        .build();
    +        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
    +
    +        try {
    +            session.read(flowFile, rawIn -> {
    +                ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
    +                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    +                while (readableByteChannel.read(byteBuffer) > 0) {
    +                    byteBuffer.flip();
    +                    writer.write(byteBuffer);
    +                    byteBuffer.clear();
    +                }
    +            });
    --- End diff --
    
    @zenfenan if you have any input on the above, that would be great. Thanks in advance!


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r189218048
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +/**
    + * Attributes associated with the BigQuery processors
    + */
    +public class BigQueryAttributes {
    +    private BigQueryAttributes() {}
    +
    +    public static final String DATASET_ATTR = "bq.dataset";
    --- End diff --
    
    That sounds okay to me, I'll try and cover that in the rebase.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r204829727
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +/**
    + * Attributes associated with the BigQuery processors
    + */
    +public class BigQueryAttributes {
    +    private BigQueryAttributes() {}
    +
    +    public static final String DATASET_ATTR = "bq.dataset";
    --- End diff --
    
    It's purely a nitpick. Feel free to ignore. I just wanted to follow the same approach that many cloud processor bundles use.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216708343
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        super.onScheduled(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final BigQuery bq = getCloudService();
    +
    +        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
    +        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final TableId tableId;
    +        if (StringUtils.isEmpty(projectId)) {
    +            tableId = TableId.of(dataset, tableName);
    +        } else {
    +            tableId = TableId.of(projectId, dataset, tableName);
    +        }
    +
    +        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
    +
    +        String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
    +        Schema schema = BqUtils.schemaFromString(schemaString);
    +
    +        WriteChannelConfiguration writeChannelConfiguration =
    +                WriteChannelConfiguration.newBuilder(tableId)
    +                        .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
    +                        .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
    +                        .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
    +                        .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
    +                        .setSchema(schema)
    +                        .setFormatOptions(FormatOptions.of(fileType))
    +                        .build();
    +
    +        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
    +
    +        try {
    +            session.read(flowFile, rawIn -> {
    +                ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
    +                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    +                while (readableByteChannel.read(byteBuffer) >= 0) {
    +                    byteBuffer.flip();
    +                    writer.write(byteBuffer);
    +                    byteBuffer.clear();
    +                }
    +            });
    +
    +            writer.close();
    +
    +            Job job = writer.getJob();
    +            PropertyValue property = context.getProperty(READ_TIMEOUT);
    +            Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
    +            Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
    +            job = job.waitFor(RetryOption.totalTimeout(duration));
    +
    +            if (job != null) {
    +                attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
    +                attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
    +                attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
    +                attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
    +
    +                boolean jobError = (job.getStatus().getError() != null);
    +
    +                if (jobError) {
    +                    attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
    +                    attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
    +                    attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
    +                } else {
    +                    // in case it got looped back from error
    +                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
    +                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
    +                    flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
    +                }
    +
    +                if (!attributes.isEmpty()) {
    +                    flowFile = session.putAllAttributes(flowFile, attributes);
    +                }
    +
    +                if (jobError) {
    +                    flowFile = session.penalize(flowFile);
    +                    session.transfer(flowFile, REL_FAILURE);
    --- End diff --
    
    I would strongly recommend to raise a bulletin here
    ````java
    getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage());
    ````


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    @pvillard31 thanks, I've added that and force pushed an update.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216691828
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        super.onScheduled(context);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final BigQuery bq = getCloudService();
    +
    +        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
    +        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final TableId tableId;
    +        if (StringUtils.isEmpty(projectId)) {
    +            tableId = TableId.of(dataset, tableName);
    +        } else {
    +            tableId = TableId.of(projectId, dataset, tableName);
    +        }
    +
    +        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
    +
    +        String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
    +        Schema schema = BqUtils.schemaFromString(schemaString);
    +
    +        WriteChannelConfiguration writeChannelConfiguration =
    +                WriteChannelConfiguration.newBuilder(tableId)
    +                        .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
    +                        .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
    +                        .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
    +                        .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
    +                        .setSchema(schema)
    +                        .setFormatOptions(FormatOptions.of(fileType))
    --- End diff --
    
    In case the input if a CSV file, there are options available to define properties of the dataset (such as header, field delimiter, etc). Can we expose that to the user? or, if not, describe how the data will be processed?


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r212762691
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.bigquery.Field;
    +import com.google.cloud.bigquery.LegacySQLTypeName;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.gson.Gson;
    +import com.google.gson.reflect.TypeToken;
    +
    +import java.lang.reflect.Type;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +
    +/**
    + *
    + */
    --- End diff --
    
    can you add a description?


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r212761916
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.api.gax.retrying.RetrySettings;
    +import com.google.auth.oauth2.GoogleCredentials;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.BigQueryOptions;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * Base class for creating processors that connect to GCP BiqQuery service
    + */
    +public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
    +    static final int BUFFER_SIZE = 8192;
    --- End diff --
    
    Is it something we want to be hard coded? any specific reason for this?


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216683619
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    --- End diff --
    
    Can we use the AllowableValue object to add a description for each possible value? That would be super useful.
    
    Something like:
    
    ````java
        static final AllowableValue CREATE_IF_NEEDED = new AllowableValue("CREATE_IF_NEEDED", JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");
    ````
    
    I'm retrieving the description from the source code.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216662096
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java ---
    @@ -43,7 +43,8 @@
                 .Builder().name("gcp-project-id")
                 .displayName("Project ID")
                 .description("Google Cloud Project ID")
    -            .required(true)
    --- End diff --
    
    OK get it. So it's for the other JIRA. Good to mention it to be sure we update the JIRA when we merge this one.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216687983
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java ---
    @@ -43,7 +43,8 @@
                 .Builder().name("gcp-project-id")
                 .displayName("Project ID")
                 .description("Google Cloud Project ID")
    -            .required(true)
    --- End diff --
    
    Can you add a custom validate to ensure this property is set in processors where it is required (such as the one you're proposing here)?


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    Hi @danieljimenez - thanks for this PR, I'll try to find time to review it if no one does. @zenfenan might be in a position to help.
    
    Just a quick comment to let you know that I'm about to merge #2680 to improve the way dependencies are managed for this bundle. You might need to rebase your PR but it shouldn't be a big change. Basically, you wouldn't have to specify the version of your dependencies as the version are already set within the google-cloud pom file.
    
    Thanks again!


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r206158466
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,263 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class, PutBigQueryStream.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +    public static final int BUFFER_SIZE = 16384;
    +
    +    private Schema schemaCache = null;
    +
    +    public PutBigQueryBatch() {
    +
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.<PropertyDescriptor>builder()
    +                .addAll(super.getSupportedPropertyDescriptors())
    +                .add(DATASET)
    +                .add(TABLE_NAME)
    +                .add(TABLE_SCHEMA)
    +                .add(SOURCE_TYPE)
    +                .add(CREATE_DISPOSITION)
    +                .add(WRITE_DISPOSITION)
    +                .add(MAXBAD_RECORDS)
    +                .add(IGNORE_UNKNOWN)
    +                .build();
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        super.onScheduled(context);
    +
    +        if (schemaCache == null) {
    +            String schemaStr = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
    +            schemaCache = BqUtils.schemaFromString(schemaStr);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        final BigQuery bq = getCloudService();
    +
    +        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
    +        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
    +        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        final TableId tableId;
    +        if (StringUtils.isEmpty(projectId)) {
    +            tableId = TableId.of(dataset, tableName);
    +        } else {
    +            tableId = TableId.of(projectId, dataset, tableName);
    +        }
    +
    +        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
    +
    +        WriteChannelConfiguration writeChannelConfiguration =
    +                WriteChannelConfiguration.newBuilder(tableId)
    +                        .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
    +                        .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
    +                        .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
    +                        .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
    +                        .setSchema(schemaCache)
    +                        .setFormatOptions(FormatOptions.of(fileType))
    +                        .build();
    +        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
    +
    +        try {
    +            session.read(flowFile, rawIn -> {
    +                ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
    +                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
    +                while (readableByteChannel.read(byteBuffer) > 0) {
    +                    byteBuffer.flip();
    +                    writer.write(byteBuffer);
    +                    byteBuffer.clear();
    +                }
    +            });
    --- End diff --
    
    Does anyone have any thoughts on the above? It seems when I run from an integration test, it works great. However when I actually use it in NiFi, it writes 0 byte tables. Any thoughts? I moved to the above to help work with large files (where the file is too large to fit in a byte array).


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    Hi, sorry for the rebases. This is ready now.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2682


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216683722
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    --- End diff --
    
    Same comment applies here.


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    I'd rather not, the scope of this keeps moving. I'd like to keep it focused on master right now.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r212762507
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java ---
    @@ -43,7 +43,8 @@
                 .Builder().name("gcp-project-id")
                 .displayName("Project ID")
                 .description("Google Cloud Project ID")
    -            .required(true)
    --- End diff --
    
    in what case the project id could be null?


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    Thanks for the feedback @danieljimenez - I just pushed a commit 6d55066fa6ac3e6adf60e566ecaa7f629def11a1. Let me know if you're able to add it on top of your PR or if you want me to open a new PR instead of this one.


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216704031
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.api.gax.retrying.RetrySettings;
    +import com.google.auth.oauth2.GoogleCredentials;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.BigQueryOptions;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * Base class for creating processors that connect to GCP BiqQuery service
    + */
    +public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
    +    static final int BUFFER_SIZE = 65536;
    +    public static final Relationship REL_SUCCESS =
    +            new Relationship.Builder().name("success")
    +                    .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
    +                    .build();
    +    public static final Relationship REL_FAILURE =
    +            new Relationship.Builder().name("failure")
    +                    .description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
    +                    .build();
    +
    +    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
    +            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    +
    +    public static final PropertyDescriptor DATASET = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.DATASET_ATTR)
    +            .displayName("Dataset")
    +            .description(BigQueryAttributes.DATASET_DESC)
    +            .required(true)
    +            .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
    +            .displayName("Table Name")
    +            .description(BigQueryAttributes.TABLE_NAME_DESC)
    +            .required(true)
    +            .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}")
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
    +            .displayName("Table Schema")
    +            .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
    +            .required(false)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor
    --- End diff --
    
    You missed that property in the list of supported property descriptors for the processor. It causes a NPE:
    ````
    2018-09-11 17:00:21,606 ERROR [Timer-Driven Process Thread-6] o.a.n.p.gcp.bigquery.PutBigQueryBatch PutBigQueryBatch[id=c8b9f07f-0165-1000-80e2-3ab46d7fb69f] null: java.lang.NullPointerException
    java.lang.NullPointerException: null
            at org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch.onTrigger(PutBigQueryBatch.java:229)
            at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
            at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
            at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
            at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    ````


---

[GitHub] nifi pull request #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216690236
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java ---
    @@ -0,0 +1,269 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.cloud.RetryOption;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.FormatOptions;
    +import com.google.cloud.bigquery.Job;
    +import com.google.cloud.bigquery.JobInfo;
    +import com.google.cloud.bigquery.Schema;
    +import com.google.cloud.bigquery.TableDataWriteChannel;
    +import com.google.cloud.bigquery.TableId;
    +import com.google.cloud.bigquery.WriteChannelConfiguration;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.LogLevel;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
    +import org.apache.nifi.processors.gcp.storage.PutGCSObject;
    +import org.apache.nifi.util.StringUtils;
    +import org.threeten.bp.Duration;
    +import org.threeten.bp.temporal.ChronoUnit;
    +
    +import java.nio.ByteBuffer;
    +import java.nio.channels.Channels;
    +import java.nio.channels.ReadableByteChannel;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A processor for batch loading data into a Google BigQuery table
    + */
    +
    +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google cloud", "bq", "bigquery"})
    +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
    +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
    +
    +@WritesAttributes({
    +        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
    +        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
    +})
    +
    +public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    +
    +    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
    +            .displayName("Load file type")
    +            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
    +            .required(true)
    +            .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType())
    +            .defaultValue(FormatOptions.avro().getType())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
    +            .displayName("Ignore Unknown Values")
    +            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .allowableValues("true", "false")
    +            .defaultValue("true")
    +            .build();
    +
    +    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
    +            .displayName("Create Disposition")
    +            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name())
    +            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
    +            .displayName("Write Disposition")
    +            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
    +            .required(true)
    +            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
    +            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
    +            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
    +            .displayName("Max Bad Records")
    +            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
    +            .required(true)
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private Schema schemaCache = null;
    --- End diff --
    
    Never used AFAICT


---

[GitHub] nifi issue #2682: NIFI-4731: BQ Processors and GCP library update.

Posted by danieljimenez <gi...@git.apache.org>.
Github user danieljimenez commented on the issue:

    https://github.com/apache/nifi/pull/2682
  
    I'm not sure about the RAT plugin, but this is ready otherwise.


---