You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@cassandra.apache.org by Brett Marcott <br...@gmail.com> on 2019/04/03 17:21:30 UTC

Infinite loop in org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter

Hi folks,

I am noticing my spark jobs being stuck when using the org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.


It seems that whenever there is a stream failure it may be expected behavior based on the code to infinite loop.

Here are one executors logs:
19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream #59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is complete
19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream #59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed


On stream failure it seems StreamResultFuture sets the exception for the AbstractFuture.
AFAIK this should cause the Abstract future to return a new ExecutionException.

The problem seems to lie in the fact that the CqlBulkRecordWriter swallows the Execution exception and continues in a while loop:
https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274 <https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274>

When taking consecutive thread dumps on the same process I see that the only thread doing work is constantly creating new ExecutionExceptions (the memory location for ExecutionException was different on each thread dump):
java.lang.Throwable.fillInStackTrace(Native Method)
java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding Monitor(java.util.concurrent.ExecutionException@80240763})
java.lang.Throwable.<init>(Throwable.java:310)
java.lang.Exception.<init>(Exception.java:102)
java.util.concurrent.ExecutionException.<init>(ExecutionException.java:90)
com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357)
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257)
org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131)
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

It seems the logic that lies right below the while loop in linked code above that checks for failed hosts/streamsessions maybe should have been within the while loop?

Thanks,

Brett

Re: Infinite loop in org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter

Posted by Brett Marcott <br...@gmail.com>.
Thanks for the recommendation Russell. Haven't looked into that code yet,
but the docs didn't seem to indicate if it wrote sstables directly instead
of going through normal write path.


On Wed, Apr 3, 2019, 11:11 AM Russell Spitzer <ru...@gmail.com>
wrote:

> I would recommend using the Spark Cassandra Connector instead of the Hadoop
> based writers. The Hadoop code has not had a lot of love in a long time.
> See
>
> https://github.com/datastax/spark-cassandra-connector
>
> On Wed, Apr 3, 2019 at 12:21 PM Brett Marcott <br...@gmail.com>
> wrote:
>
> > Hi folks,
> >
> > I am noticing my spark jobs being stuck when using the
> > org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.
> >
> >
> > It seems that whenever there is a stream failure it may be expected
> > behavior based on the code to infinite loop.
> >
> > Here are one executors logs:
> > 19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream
> > #59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is
> > complete
> > 19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream
> > #59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed
> >
> >
> > On stream failure it seems StreamResultFuture sets the exception for the
> > AbstractFuture.
> > AFAIK this should cause the Abstract future to return a new
> > ExecutionException.
> >
> > The problem seems to lie in the fact that the CqlBulkRecordWriter
> swallows
> > the Execution exception and continues in a while loop:
> >
> >
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> > <
> >
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> > >
> >
> > When taking consecutive thread dumps on the same process I see that the
> > only thread doing work is constantly creating new ExecutionExceptions
> (the
> > memory location for ExecutionException was different on each thread
> dump):
> > java.lang.Throwable.fillInStackTrace(Native Method)
> > java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding
> > Monitor(java.util.concurrent.ExecutionException@80240763})
> > java.lang.Throwable.<init>(Throwable.java:310)
> > java.lang.Exception.<init>(Exception.java:102)
> >
> java.util.concurrent.ExecutionException.<init>(ExecutionException.java:90)
> >
> >
> com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
> >
> >
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357)
> >
> >
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257)
> >
> >
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237)
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131)
> >
> >
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> > org.apache.spark.scheduler.Task.run(Task.scala:99)
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285)
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > java.lang.Thread.run(Thread.java:748)
> >
> > It seems the logic that lies right below the while loop in linked code
> > above that checks for failed hosts/streamsessions maybe should have been
> > within the while loop?
> >
> > Thanks,
> >
> > Brett
>

Re: Infinite loop in org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter

Posted by Russell Spitzer <ru...@gmail.com>.
I would recommend using the Spark Cassandra Connector instead of the Hadoop
based writers. The Hadoop code has not had a lot of love in a long time. See

https://github.com/datastax/spark-cassandra-connector

On Wed, Apr 3, 2019 at 12:21 PM Brett Marcott <br...@gmail.com>
wrote:

> Hi folks,
>
> I am noticing my spark jobs being stuck when using the
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter/CqlBulkOutputFormat.
>
>
> It seems that whenever there is a stream failure it may be expected
> behavior based on the code to infinite loop.
>
> Here are one executors logs:
> 19/04/03 15:35:06 INFO streaming.StreamResultFuture: [Stream
> #59290530-5625-11e9-a2bb-8bc7b49d56b0] Session with /10.82.204.173 is
> complete
> 19/04/03 15:35:06 WARN streaming.StreamResultFuture: [Stream
> #59290530-5625-11e9-a2bb-8bc7b49d56b0] Stream failed
>
>
> On stream failure it seems StreamResultFuture sets the exception for the
> AbstractFuture.
> AFAIK this should cause the Abstract future to return a new
> ExecutionException.
>
> The problem seems to lie in the fact that the CqlBulkRecordWriter swallows
> the Execution exception and continues in a while loop:
>
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> <
> https://github.com/apache/cassandra/blob/207c80c1fd63dfbd8ca7e615ec8002ee8983c5d6/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java#L256-L274
> >
>
> When taking consecutive thread dumps on the same process I see that the
> only thread doing work is constantly creating new ExecutionExceptions (the
> memory location for ExecutionException was different on each thread dump):
> java.lang.Throwable.fillInStackTrace(Native Method)
> java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding
> Monitor(java.util.concurrent.ExecutionException@80240763})
> java.lang.Throwable.<init>(Throwable.java:310)
> java.lang.Exception.<init>(Exception.java:102)
> java.util.concurrent.ExecutionException.<init>(ExecutionException.java:90)
>
> com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:476)
>
> com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:357)
>
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:257)
>
> org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter.close(CqlBulkRecordWriter.java:237)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$5.apply$mcV$sp(PairRDDFunctions.scala:1131)
>
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1359)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1131)
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> org.apache.spark.scheduler.Task.run(Task.scala:99)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:285)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:748)
>
> It seems the logic that lies right below the while loop in linked code
> above that checks for failed hosts/streamsessions maybe should have been
> within the while loop?
>
> Thanks,
>
> Brett