You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2018/10/10 12:18:13 UTC
How to use of BigQueryIO Method.FILE_LOADS when reading from a
unbounded source?
I am trying to read from an unbounded source and using FILE_LOADS instead
of streaming inserts towards BigQuery.
If I don't have the following two lines
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(10))
my code works just fine, but uses streaming inserts. If I add them I get a
non-specific stacktrace like:
Exception in thread "main" java.lang.IllegalArgumentException
at
com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)
where line 181 is the first line of the following code excerpt:
BigQueryIO.<Event>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(10))
.to(
new DynamicDestinations<Event, String>() {
@Override
public String getDestination(ValueInSingleWindow<Event> element) {
return element.getValue().getTopic();
}
@Override
public TableDestination getTable(String destination) {
return new TableDestination(
"charged-dialect-824:KafkaStaging.test", null, new
TimePartitioning().setType("DAY"));
}
@Override
public TableSchema getSchema(String destination) {
return inputMessagesConfig.getTableSchema(destination);
}
})
.withFormatFunction(
(SerializableFunction<Event, TableRow>)
event -> convertUserEventToTableRow(event))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
I am not sure what I am doing wrong here, I tried higher values for the
Duration, but that didn't help. I wasn't able to find the root
cause for the exception with the debugger, any idea how I can get to the
bottom of this?
Tobi
Re: How to use of BigQueryIO Method.FILE_LOADS when reading from a
unbounded source?
Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Hi Wout,
you are so right - I forgot the --tempLocation= parameter when launching
and after that I also needed to set the number of shards by adding:
.withNumFileShards(1)
Thank you!
Tobi
On Wed, Oct 10, 2018 at 3:23 PM Wout Scheepers <
Wout.Scheepers@vente-exclusive.com> wrote:
> Hey Tobias,
>
>
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
>
>
>
> points to the following code snippet (starting from BatchLoads.java:210) :
>
> if (bigQueryServices == null) {
> try {
> GcsPath.fromUri(tempLocation);
> } catch (IllegalArgumentException e) {
> throw new IllegalArgumentException(
> String.format(
> "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
> tempLocation),
> e);
> }
> }
>
>
>
> are you sure your templocation is set correctly? I guess it’s needed for staging a bigquery load job instead of streaming.
>
>
>
> Wout
>
>
>
>
>
>
>
> *From: *"Kaymak, Tobias" <to...@ricardo.ch>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Wednesday, 10 October 2018 at 14:18
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *How to use of BigQueryIO Method.FILE_LOADS when reading from a
> unbounded source?
>
>
>
> I am trying to read from an unbounded source and using FILE_LOADS instead
> of streaming inserts towards BigQuery.
>
> If I don't have the following two lines
>
>
>
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>
> .withTriggeringFrequency(Duration.standardMinutes(10))
>
>
>
> my code works just fine, but uses streaming inserts. If I add them I get a
> non-specific stacktrace like:
>
>
>
> Exception in thread "main" java.lang.IllegalArgumentException
>
> at
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
>
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
>
> at
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
>
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
>
> at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)
>
>
>
> where line 181 is the first line of the following code excerpt:
>
>
>
> BigQueryIO.<Event>write()
>
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>
> .withTriggeringFrequency(Duration.standardMinutes(10))
>
>
>
> .to(
>
> new DynamicDestinations<Event, String>() {
>
> @Override
>
> public String getDestination(ValueInSingleWindow<Event> element) {
>
> return element.getValue().getTopic();
>
> }
>
>
>
> @Override
>
> public TableDestination getTable(String destination) {
>
> return new TableDestination(
>
> "charged-dialect-824:KafkaStaging.test", null, new
> TimePartitioning().setType("DAY"));
>
> }
>
>
>
> @Override
>
> public TableSchema getSchema(String destination) {
>
> return inputMessagesConfig.getTableSchema(destination);
>
> }
>
> })
>
> .withFormatFunction(
>
> (SerializableFunction<Event, TableRow>)
>
> event -> convertUserEventToTableRow(event))
>
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>
>
>
> I am not sure what I am doing wrong here, I tried higher values for the
> Duration, but that didn't help. I wasn't able to find the root
>
> cause for the exception with the debugger, any idea how I can get to the
> bottom of this?
>
> Tobi
>
>
>
--
Tobias Kaymak
Data Engineer
tobias.kaymak@ricardo.ch
www.ricardo.ch
Re: How to use of BigQueryIO Method.FILE_LOADS when reading from a
unbounded source?
Posted by Wout Scheepers <Wo...@vente-exclusive.com>.
Hey Tobias,
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
points to the following code snippet (starting from BatchLoads.java:210) :
if (bigQueryServices == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
tempLocation),
e);
}
}
are you sure your templocation is set correctly? I guess it’s needed for staging a bigquery load job instead of streaming.
Wout
From: "Kaymak, Tobias" <to...@ricardo.ch>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, 10 October 2018 at 14:18
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: How to use of BigQueryIO Method.FILE_LOADS when reading from a unbounded source?
I am trying to read from an unbounded source and using FILE_LOADS instead of streaming inserts towards BigQuery.
If I don't have the following two lines
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(10))
my code works just fine, but uses streaming inserts. If I add them I get a non-specific stacktrace like:
Exception in thread "main" java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)
where line 181 is the first line of the following code excerpt:
BigQueryIO.<Event>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(10))
.to(
new DynamicDestinations<Event, String>() {
@Override
public String getDestination(ValueInSingleWindow<Event> element) {
return element.getValue().getTopic();
}
@Override
public TableDestination getTable(String destination) {
return new TableDestination(
"charged-dialect-824:KafkaStaging.test", null, new TimePartitioning().setType("DAY"));
}
@Override
public TableSchema getSchema(String destination) {
return inputMessagesConfig.getTableSchema(destination);
}
})
.withFormatFunction(
(SerializableFunction<Event, TableRow>)
event -> convertUserEventToTableRow(event))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
I am not sure what I am doing wrong here, I tried higher values for the Duration, but that didn't help. I wasn't able to find the root
cause for the exception with the debugger, any idea how I can get to the bottom of this?
Tobi