You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Marco Veluscek (JIRA)" <ji...@apache.org> on 2019/01/11 09:22:00 UTC
[jira] [Comment Edited] (BEAM-3772) BigQueryIO - Can't use
DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and
FILE_LOADS
[ https://issues.apache.org/jira/browse/BEAM-3772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739264#comment-16739264 ]
Marco Veluscek edited comment on BEAM-3772 at 1/11/19 9:21 AM:
---------------------------------------------------------------
I am experiencing the same issue both with Beam 2.6.0 and 2.9.0.
I am also using Spotify Scio 0.6.1.
Do you have any news about this issue?
I have written a sample code with Scio to test this problem. Is this code correct?
If I run the following code with the DataflowRunner, I get the same error as in the original post. If I run it with the DirectRunner, it works, I am able to create all tables I need in BigQuery.
{code:java}
import com.google.api.services.bigquery.model.{TableFieldSchema, TableReference, TableRow, TableSchema}
import com.spotify.scio.ContextAndArgs
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, DynamicDestinations, TableDestination}
import org.apache.beam.sdk.values.ValueInSingleWindow
import org.joda.time.Duration
import scala.collection.JavaConverters._
object PubSubToBigQuery {
case class Row(id: Long, data: String, data2: String, destination: String)
/**
* Run the example with
* run \
* --project=<PROJECT_ID> \
* --runner=DataflowRunner \
* --stagingLocation=gs://<BUCKET_ID>/staging \
* --tempLocation=gs://<BUCKET_ID>/temp \
* --region=europe-west1
*
* Message Example 1: 1,string_data1,string_data2,table_name1
* Message Example 2: 2,string_data1,string_data2,table_name2 --> the creation of this table fails
*/
def main(cargs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cargs)
sc
.pubsubSubscription[String]("projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>")
.map(s => {
val fields = s.split(",")
Row(
id = fields(0).toLong,
data = fields(1),
data2 = fields(2),
destination = fields(3)
)
})
.saveAsCustomOutput(
"Custom bigquery IO",
BigQueryIO.write[Row]()
.to(new DynamicDestinations[Row, String]() {
override def getDestination(element: ValueInSingleWindow[Row]): String =
element.getValue.destination
override def getTable(destination: String): TableDestination =
new TableDestination(
new TableReference()
.setProjectId("<PROJECT_ID>")
.setDatasetId("<DATA_SET_ID>")
.setTableId(destination),
"Table for destination " + destination)
override def getSchema(destination: String): TableSchema = {
val nestedFields = List(new TableFieldSchema().setName("nested_data").setType("STRING")).asJava
new TableSchema().setFields(
List(
new TableFieldSchema().setName("id").setType("INTEGER"),
new TableFieldSchema()
.setName("data")
.setType("RECORD")
.setMode("REPEATED")
.setFields(nestedFields)
).asJava)
}
})
.withFormatFunction(
elem => {
val nestedTableRow = List(
new TableRow().set("nested_data", elem.data),
new TableRow().set("nested_data", elem.data2)
).asJava
new TableRow()
.set("id", elem.id)
.set("data", nestedTableRow)
}
)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withNumFileShards(1)
.withTriggeringFrequency(Duration.standardSeconds(1))
)
sc.close()
}
}
{code}
was (Author: mckay):
I am experiencing the same issue with Beam 2.6.0 and 2.9.0.
I am also using Spotify Scio 0.6.1.
Do you have any news about this issue?
I have written a sample code with Scio to test this problem. Is this code correct?
If I run the following code with the DirectRunner, it works, I am able to create all tables I need in BigQuery.
{code:java}
import com.google.api.services.bigquery.model.{TableFieldSchema, TableReference, TableRow, TableSchema}
import com.spotify.scio.ContextAndArgs
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
import org.apache.beam.sdk.io.gcp.bigquery.{BigQueryIO, DynamicDestinations, TableDestination}
import org.apache.beam.sdk.values.ValueInSingleWindow
import org.joda.time.Duration
import scala.collection.JavaConverters._
object PubSubToBigQuery {
case class Row(id: Long, data: String, data2: String, destination: String)
/**
* Run the example with
* run \
* --project=<PROJECT_ID> \
* --runner=DataflowRunner \
* --stagingLocation=gs://<BUCKET_ID>/staging \
* --tempLocation=gs://<BUCKET_ID>/temp \
* --region=europe-west1
*
* Message Example 1: 1,string_data1,string_data2,table_name1
* Message Example 2: 2,string_data1,string_data2,table_name2 --> the creation of this table fails
*/
def main(cargs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cargs)
sc
.pubsubSubscription[String]("projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>")
.map(s => {
val fields = s.split(",")
Row(
id = fields(0).toLong,
data = fields(1),
data2 = fields(2),
destination = fields(3)
)
})
.saveAsCustomOutput(
"Custom bigquery IO",
BigQueryIO.write[Row]()
.to(new DynamicDestinations[Row, String]() {
override def getDestination(element: ValueInSingleWindow[Row]): String =
element.getValue.destination
override def getTable(destination: String): TableDestination =
new TableDestination(
new TableReference()
.setProjectId("<PROJECT_ID>")
.setDatasetId("<DATA_SET_ID>")
.setTableId(destination),
"Table for destination " + destination)
override def getSchema(destination: String): TableSchema = {
val nestedFields = List(new TableFieldSchema().setName("nested_data").setType("STRING")).asJava
new TableSchema().setFields(
List(
new TableFieldSchema().setName("id").setType("INTEGER"),
new TableFieldSchema()
.setName("data")
.setType("RECORD")
.setMode("REPEATED")
.setFields(nestedFields)
).asJava)
}
})
.withFormatFunction(
elem => {
val nestedTableRow = List(
new TableRow().set("nested_data", elem.data),
new TableRow().set("nested_data", elem.data2)
).asJava
new TableRow()
.set("id", elem.id)
.set("data", nestedTableRow)
}
)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withNumFileShards(1)
.withTriggeringFrequency(Duration.standardSeconds(1))
)
sc.close()
}
}
{code}
> BigQueryIO - Can't use DynamicDestination with CREATE_IF_NEEDED for unbounded PCollection and FILE_LOADS
> --------------------------------------------------------------------------------------------------------
>
> Key: BEAM-3772
> URL: https://issues.apache.org/jira/browse/BEAM-3772
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.2.0, 2.3.0
> Environment: Dataflow streaming pipeline
> Reporter: Benjamin BENOIST
> Assignee: Eugene Kirpichov
> Priority: Major
>
> My workflow : KAFKA -> Dataflow streaming -> BigQuery
> Given that having low-latency isn't important in my case, I use FILE_LOADS to reduce the costs. I'm using _BigQueryIO.Write_ with a _DynamicDestination_, which is a table with the current hour as a suffix.
> This _BigQueryIO.Write_ is configured like this :
> {code:java}
> .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
> .withMethod(Method.FILE_LOADS)
> .withTriggeringFrequency(triggeringFrequency)
> .withNumFileShards(100)
> {code}
> The first table is successfully created and is written to. But then the following tables are never created and I get these exceptions:
> {code:java}
> (99e5cd8c66414e7a): java.lang.RuntimeException: Failed to create load job with id prefix 5047f71312a94bf3a42ee5d67feede75_5295fbf25e1a7534f85e25dcaa9f4986_00001_00023, reached max retries: 3, last failed load job: {
> "configuration" : {
> "load" : {
> "createDisposition" : "CREATE_NEVER",
> "destinationTable" : {
> "datasetId" : "dev_mydataset",
> "projectId" : "myproject-id",
> "tableId" : "mytable_20180302_16"
> },
> {code}
> The _CreateDisposition_ used is _CREATE_NEVER_, contrary as _CREATE_IF_NEEDED_ as specified.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)