You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Marco Veluscek (JIRA)" <ji...@apache.org> on 2019/01/11 09:22:00 UTC

[jira] [Comment Edited] (BEAM-3772) BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS

    [ https://issues.apache.org/jira/browse/BEAM-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739264#comment-16739264 ] 

Marco Veluscek edited comment on BEAM-3772 at 1/11/19 9:21 AM:
---------------------------------------------------------------

I am experiencing the same issue both with Beam 2.6.0 and 2.9.0.

I am also using Spotify Scio 0.6.1.

Do you have any news about this issue?

I have written a sample code with Scio to test this problem. Is this code correct?

If I run the following code with the DataflowRunner, I get the same error as in the original post. If I run it with the DirectRunner, it works, I am able to create all tables I need in BigQuery.
{code:java}
import com.google.api.services.bigquery.model.{TableFieldSchema, TableReference, TableRow, TableSchema}
import com.spotify.scio.ContextAndArgs
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, DynamicDestinations, TableDestination}
import org.apache.beam.sdk.values.ValueInSingleWindow
import org.joda.time.Duration

import scala.collection.JavaConverters._

object PubSubToBigQuery {

  case class Row(id: Long, data: String, data2: String, destination: String)

  /**
  * Run the example with
  * run \
  *   --project=<PROJECT_ID> \
  *   --runner=DataflowRunner \
  *   --stagingLocation=gs://<BUCKET_ID>/staging \
  *   --tempLocation=gs://<BUCKET_ID>/temp \
  *   --region=europe-west1
  *
  * Message Example 1: 1,string_data1,string_data2,table_name1
  * Message Example 2: 2,string_data1,string_data2,table_name2 --> the creation of this table fails
    */
  def main(cargs: Array[String]): Unit = {

    val (sc, args) = ContextAndArgs(cargs)

    sc
      .pubsubSubscription[String]("projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>")
      .map(s => {
        val fields = s.split(",")
        Row(
          id = fields(0).toLong,
          data = fields(1),
          data2 = fields(2),
          destination = fields(3)
        )
      })
      .saveAsCustomOutput(
        "Custom bigquery IO",
        BigQueryIO.write[Row]()
          .to(new DynamicDestinations[Row, String]() {

            override def getDestination(element: ValueInSingleWindow[Row]): String =
              element.getValue.destination

            override def getTable(destination: String): TableDestination =
              new TableDestination(
                new TableReference()
                  .setProjectId("<PROJECT_ID>")
                  .setDatasetId("<DATA_SET_ID>")
                  .setTableId(destination),
                "Table for destination " + destination)

            override def getSchema(destination: String): TableSchema = {
              val nestedFields = List(new TableFieldSchema().setName("nested_data").setType("STRING")).asJava
              new TableSchema().setFields(
                  List(
                    new TableFieldSchema().setName("id").setType("INTEGER"),
                    new TableFieldSchema()
                      .setName("data")
                      .setType("RECORD")
                      .setMode("REPEATED")
                      .setFields(nestedFields)
                  ).asJava)
            }
          })
          .withFormatFunction(
            elem => {
              val nestedTableRow = List(
                new TableRow().set("nested_data", elem.data),
                new TableRow().set("nested_data", elem.data2)
              ).asJava
              new TableRow()
                .set("id", elem.id)
                .set("data", nestedTableRow)
            }
          )
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
          .withWriteDisposition(WriteDisposition.WRITE_APPEND)
          .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
          .withNumFileShards(1)
          .withTriggeringFrequency(Duration.standardSeconds(1))
      )

    sc.close()
  }
}
{code}


was (Author: mckay):
I am experiencing the same issue with Beam 2.6.0 and 2.9.0.

I am also using Spotify Scio 0.6.1.

Do you have any news about this issue?

I have written a sample code with Scio to test this problem. Is this code correct?

If I run the following code with the DirectRunner, it works, I am able to create all tables I need in BigQuery.
{code:java}
import com.google.api.services.bigquery.model.{TableFieldSchema, TableReference, TableRow, TableSchema}
import com.spotify.scio.ContextAndArgs
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, DynamicDestinations, TableDestination}
import org.apache.beam.sdk.values.ValueInSingleWindow
import org.joda.time.Duration

import scala.collection.JavaConverters._

object PubSubToBigQuery {

  case class Row(id: Long, data: String, data2: String, destination: String)

  /**
  * Run the example with
  * run \
  *   --project=<PROJECT_ID> \
  *   --runner=DataflowRunner \
  *   --stagingLocation=gs://<BUCKET_ID>/staging \
  *   --tempLocation=gs://<BUCKET_ID>/temp \
  *   --region=europe-west1
  *
  * Message Example 1: 1,string_data1,string_data2,table_name1
  * Message Example 2: 2,string_data1,string_data2,table_name2 --> the creation of this table fails
    */
  def main(cargs: Array[String]): Unit = {

    val (sc, args) = ContextAndArgs(cargs)

    sc
      .pubsubSubscription[String]("projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>")
      .map(s => {
        val fields = s.split(",")
        Row(
          id = fields(0).toLong,
          data = fields(1),
          data2 = fields(2),
          destination = fields(3)
        )
      })
      .saveAsCustomOutput(
        "Custom bigquery IO",
        BigQueryIO.write[Row]()
          .to(new DynamicDestinations[Row, String]() {

            override def getDestination(element: ValueInSingleWindow[Row]): String =
              element.getValue.destination

            override def getTable(destination: String): TableDestination =
              new TableDestination(
                new TableReference()
                  .setProjectId("<PROJECT_ID>")
                  .setDatasetId("<DATA_SET_ID>")
                  .setTableId(destination),
                "Table for destination " + destination)

            override def getSchema(destination: String): TableSchema = {
              val nestedFields = List(new TableFieldSchema().setName("nested_data").setType("STRING")).asJava
              new TableSchema().setFields(
                  List(
                    new TableFieldSchema().setName("id").setType("INTEGER"),
                    new TableFieldSchema()
                      .setName("data")
                      .setType("RECORD")
                      .setMode("REPEATED")
                      .setFields(nestedFields)
                  ).asJava)
            }
          })
          .withFormatFunction(
            elem => {
              val nestedTableRow = List(
                new TableRow().set("nested_data", elem.data),
                new TableRow().set("nested_data", elem.data2)
              ).asJava
              new TableRow()
                .set("id", elem.id)
                .set("data", nestedTableRow)
            }
          )
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
          .withWriteDisposition(WriteDisposition.WRITE_APPEND)
          .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
          .withNumFileShards(1)
          .withTriggeringFrequency(Duration.standardSeconds(1))
      )

    sc.close()
  }
}
{code}

> BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS
> --------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3772
>                 URL: https://issues.apache.org/jira/browse/BEAM-3772
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.2.0, 2.3.0
>         Environment: Dataflow streaming pipeline
>            Reporter: Benjamin BENOIST
>            Assignee: Eugene Kirpichov
>            Priority: Major
>
> My workflow : KAFKA -> Dataflow streaming -> BigQuery
> Given that having low-latency isn't important in my case, I use FILE_LOADS to reduce the costs. I'm using _BigQueryIO.Write_ with a _DynamicDestination_, which is a table with the current hour as a suffix.
> This _BigQueryIO.Write_ is configured like this :
> {code:java}
> .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
> .withMethod(Method.FILE_LOADS)
> .withTriggeringFrequency(triggeringFrequency)
> .withNumFileShards(100)
> {code}
> The first table is successfully created and is written to. But then the following tables are never created and I get these exceptions:
> {code:java}
> (99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job with id prefix 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_00001_00023, reached max retries: 3, last failed load job: {
>   "configuration" : {
>     "load" : {
>       "createDisposition" : "CREATE_NEVER",
>       "destinationTable" : {
>         "datasetId" : "dev_mydataset",
>         "projectId" : "myproject-id",
>         "tableId" : "mytable_20180302_16"
>       },
> {code}
> The _CreateDisposition_ used is _CREATE_NEVER_, contrary as _CREATE_IF_NEEDED_ as specified.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)