You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2020/07/16 19:09:06 UTC
CassandraIO - random failures in batch mode while writing - how to recover?
Hello,
I am trying to load a table that is about 200 GiB in size in BigQuery to
Cassandra via a batch job in Beam 2.22.0 on Flink 1.10.1 - the job runs but
fails at random points in time throwing different errors each time - and
not always at the same points in the data (which comes in pretty clean from
BigQuery as far as I can see).
I can see here that there seems to be an invalid query string (log level
is debug) - do I have a chance to set a retry strategy for the CassandraIO
or is there another way to deal with this situation?
2020-07-16 16:22:21,926 ERROR org.apache.flink.runtime.operators.BatchTask
- Error in task code: CHAIN MapPartition (MapPartition at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write)) -> FlatMap (FlatMap at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write).output) (2/2)
org.apache.beam.sdk.util.UserCodeException:
java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:143)
at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
at
com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
at
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:487)
at
com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.waitForFuturesToFinish(CassandraIO.java:1169)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.mutate(CassandraIO.java:1148)
at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn.processElement(CassandraIO.java:1034)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException:
String didn't validate.
at
com.datastax.driver.core.Responses$Error.asException(Responses.java:181)
at
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
at
com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:61)
at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1005)
at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:808)
at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1262)
at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1180)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at
com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at
io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Best,
Tobi
Re: CassandraIO - random failures in batch mode while writing - how
to recover?
Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Ok so the easiest way out of this was to set the consistency level to
`LOCAL_QUORUM` in the CassandraIO, this way everything went smoothly.
On Thu, Jul 16, 2020 at 9:09 PM Kaymak, Tobias <to...@ricardo.ch>
wrote:
> Hello,
>
> I am trying to load a table that is about 200 GiB in size in BigQuery to
> Cassandra via a batch job in Beam 2.22.0 on Flink 1.10.1 - the job runs but
> fails at random points in time throwing different errors each time - and
> not always at the same points in the data (which comes in pretty clean from
> BigQuery as far as I can see).
>
> I can see here that there seems to be an invalid query string (log level
> is debug) - do I have a chance to set a retry strategy for the CassandraIO
> or is there another way to deal with this situation?
>
> 2020-07-16 16:22:21,926 ERROR
> org.apache.flink.runtime.operators.BatchTask - Error in
> task code: CHAIN MapPartition (MapPartition at
> CassandraIO.Write/ParDo(Write)/ParMultiDo(Write)) -> FlatMap (FlatMap at
> CassandraIO.Write/ParDo(Write)/ParMultiDo(Write).output) (2/2)
> org.apache.beam.sdk.util.UserCodeException:
> java.util.concurrent.ExecutionException:
> com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
> validate.
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> at
> org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:143)
> at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
> validate.
> at
> com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
> at
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:487)
> at
> com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83)
> at
> org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.waitForFuturesToFinish(CassandraIO.java:1169)
> at
> org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.mutate(CassandraIO.java:1148)
> at
> org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn.processElement(CassandraIO.java:1034)
> Caused by: com.datastax.driver.core.exceptions.InvalidQueryException:
> String didn't validate.
> at
> com.datastax.driver.core.Responses$Error.asException(Responses.java:181)
> at
> com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
> at
> com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
> at
> com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:61)
> at
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1005)
> at
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:808)
> at
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1262)
> at
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1180)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
> at
> com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
> at
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.lang.Thread.run(Thread.java:748)
>
> Best,
> Tobi
>