You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/18 10:08:01 UTC

[GitHub] [beam] WojciechM88 opened a new issue, #22314: Apache Beam BigQuery Storage Write Api error checkTimestamp(SimpleDoFnRunner.java:259)

WojciechM88 opened a new issue, #22314:
URL: https://github.com/apache/beam/issues/22314

   Hi, 
   I used Apache Beam in version 2.40.0
   I Write pipeline:
   `		Pipeline pipeline = Pipeline.create(options);
   		
   		PCollection<PubsubMessage> messages = pipeline .apply("Read PubSub Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription("subscription"));
   
   		WriteResult out = messages.apply("WriteSuccessfulRecourds",
   				BigQueryIO.<PubsubMessage>write()
   					.withoutValidation()
   					.withFormatFunction((PubsubMessage elem) -> {
   						System.out.println("Start transform: " + elem);
   						TableRow tableRow = new TableRow().set("ID", "1").set("NAME", "TEST").set("DATE", new String(elem.getPayload()));//"2022-01-01T23:15");
   						tableRow.set("messageCustom", elem);
   						return tableRow;
   					})
   					.withCreateDisposition(CreateDisposition.CREATE_NEVER)
   					.withWriteDisposition(WriteDisposition.WRITE_APPEND)
   					.withExtendedErrorInfo()
   					.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
   					//.withTriggeringFrequency(Duration.millis(500))
   					.withNumStorageWriteApiStreams(20)
   					.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
   					.ignoreUnknownValues()
   					//.withExtendedErrorInfo()
   					.to(String.format("%s:%s.%s", project", "TEST_BIG_QUERY", "TEST_BIG_QUERY_TABLE")));
   
   		out.getFailedStorageApiInserts().apply("Error", new PubsubMessageToTableRow(options));
   
   		return pipeline.run();`
   
   And after 60 minutes i have this error:
   
   `Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Cannot output with timestamp 294247-01-10T04:00:54.776Z. Output timestamps must be no earlier than the timestamp of the current input or timer (294247-01-10T04:00:54.776Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
   	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
   	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
   	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
   	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
   	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
   	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
   	at pl.woxtech.dataflow.pubsub.to.bigquery.GCPPubSubToBigQuery.run(GCPPubSubToBigQuery.java:132)
   	at pl.woxtech.dataflow.pubsub.to.bigquery.GCPPubSubToBigQuery.main(GCPPubSubToBigQuery.java:73)
   Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 294247-01-10T04:00:54.776Z. Output timestamps must be no earlier than the timestamp of the current input or timer (294247-01-10T04:00:54.776Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
   	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:259)
   	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:85)
   	at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.output(SimpleDoFnRunner.java:843)
   	at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
   	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.finalizeStream(StorageApiWritesShardedRecords.java:536)
   	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.onTimer(StorageApiWritesShardedRecords.java:550)`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] WojciechM88 commented on issue #22314: Apache Beam BigQuery Storage Write Api error checkTimestamp(SimpleDoFnRunner.java:259)

Posted by GitBox <gi...@apache.org>.
WojciechM88 commented on issue #22314:
URL: https://github.com/apache/beam/issues/22314#issuecomment-1187156421

   In SimpleDoFnRunner is line:
   outputTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
   
   Why is plus 1 millis?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org