You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2020/07/16 19:09:06 UTC

CassandraIO - random failures in batch mode while writing - how to recover?

Hello,

I am trying to load a table that is about 200 GiB in size in BigQuery to
Cassandra via a batch job in Beam 2.22.0 on Flink 1.10.1 - the job runs but
fails at random points in time throwing different errors each time - and
not always at the same points in the data (which comes in pretty clean from
BigQuery as far as I can see).

I can see here that there seems to be an invalid query string  (log level
is debug) - do I have a chance to set a retry strategy for the CassandraIO
or is there another way to deal with this situation?

 2020-07-16 16:22:21,926 ERROR org.apache.flink.runtime.operators.BatchTask
                 - Error in task code:  CHAIN MapPartition (MapPartition at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write)) -> FlatMap (FlatMap at
CassandraIO.Write/ParDo(Write)/ParMultiDo(Write).output) (2/2)
org.apache.beam.sdk.util.UserCodeException:
java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
        at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
        at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
        at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
        at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
        at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at
org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:143)
        at
org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException:
com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
validate.
        at
com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
        at
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:487)
        at
com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83)
        at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.waitForFuturesToFinish(CassandraIO.java:1169)
        at
org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.mutate(CassandraIO.java:1148)
        at
org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn.processElement(CassandraIO.java:1034)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException:
String didn't validate.
        at
com.datastax.driver.core.Responses$Error.asException(Responses.java:181)
        at
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
        at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
        at
com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:61)
        at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1005)
        at
com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:808)
        at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1262)
        at
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1180)
        at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
        at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at
com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
        at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
        at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
        at
io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
        at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

Best,
Tobi

Re: CassandraIO - random failures in batch mode while writing - how to recover?

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Ok so the easiest way out of this was to set the consistency level to
`LOCAL_QUORUM` in the CassandraIO, this way everything went smoothly.

On Thu, Jul 16, 2020 at 9:09 PM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Hello,
>
> I am trying to load a table that is about 200 GiB in size in BigQuery to
> Cassandra via a batch job in Beam 2.22.0 on Flink 1.10.1 - the job runs but
> fails at random points in time throwing different errors each time - and
> not always at the same points in the data (which comes in pretty clean from
> BigQuery as far as I can see).
>
> I can see here that there seems to be an invalid query string  (log level
> is debug) - do I have a chance to set a retry strategy for the CassandraIO
> or is there another way to deal with this situation?
>
>  2020-07-16 16:22:21,926 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN MapPartition (MapPartition at
> CassandraIO.Write/ParDo(Write)/ParMultiDo(Write)) -> FlatMap (FlatMap at
> CassandraIO.Write/ParDo(Write)/ParMultiDo(Write).output) (2/2)
> org.apache.beam.sdk.util.UserCodeException:
> java.util.concurrent.ExecutionException:
> com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
> validate.
>         at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>         at
> org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>         at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>         at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>         at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>         at
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:143)
>         at
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException:
> com.datastax.driver.core.exceptions.InvalidQueryException: String didn't
> validate.
>         at
> com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:526)
>         at
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:487)
>         at
> com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:83)
>         at
> org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.waitForFuturesToFinish(CassandraIO.java:1169)
>         at
> org.apache.beam.sdk.io.cassandra.CassandraIO$Mutator.mutate(CassandraIO.java:1148)
>         at
> org.apache.beam.sdk.io.cassandra.CassandraIO$WriteFn.processElement(CassandraIO.java:1034)
> Caused by: com.datastax.driver.core.exceptions.InvalidQueryException:
> String didn't validate.
>         at
> com.datastax.driver.core.Responses$Error.asException(Responses.java:181)
>         at
> com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
>         at
> com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
>         at
> com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:61)
>         at
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1005)
>         at
> com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:808)
>         at
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1262)
>         at
> com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1180)
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at
> io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>         at
> com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>         at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
>         at
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:748)
>
> Best,
> Tobi
>