You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kudu.apache.org by "Junegunn Choi (Jira)" <ji...@apache.org> on 2020/10/26 06:39:00 UTC

[jira] [Updated] (KUDU-3205) NPE in KuduScanTokenBuilder#build after a tablet server goes down

     [ https://issues.apache.org/jira/browse/KUDU-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Junegunn Choi updated KUDU-3205:
--------------------------------
    Description: 
When a tablet server goes down while running a query on Spark, the connection becomes unusable due to the cached tablet locations that have become stale.
h2. Steps to reproduce
h3. Start spark-shell with kudu-spark2 1.13.0

The problem is not reproducible with kudu-spark2 1.12.0 or below, because it was introduced in [KUDU-1802 |https://github.com/apache/kudu/commit/d23ee5d38ddc4317f431dd65df0c825c00cc968a].
h3. Run a scan query
{code:scala}
import org.apache.kudu.spark.kudu._
val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> "dummy")).kudu
dummy.createOrReplaceTempView("dummy")

spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
{code}
h3. Kill a tablet server

Kill one of the tablet servers that are serving data for the query. The query should fail immediately.
{noformat}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.lang.RuntimeException: org.apache.kudu.client.NonRecoverableException: Scanner *** not found (it may have expired)
{noformat}
h3. Re-run the query
{code:scala}
spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
{code}
Doesn't work, fails with an NPE.
{noformat}
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
  at org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:697)
  at org.apache.kudu.spark.kudu.KuduRDD.getPartitions(KuduRDD.scala:95)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 86 more
Caused by: java.lang.NullPointerException
  at org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:674)
  ... 117 more
{noformat}
Re-creating the DataFrame doesn't help:
{code:scala}
val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> "dummy")).kudu
dummy.createOrReplaceTempView("dummy")

// Still fails with an NPE
spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
{code}
h2. Cause
{code:java|title=KuduScanToken.java:666}
// Build the list of replica metadata.
List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new ArrayList<>();
for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) {
  Integer serverIndex = serverIndexMap.get(
      new HostAndPort(replica.getRpcHost(), replica.getRpcPort()));
  Client.TabletMetadataPB.ReplicaMetadataPB.Builder tabletMetadataBuilder =
      Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder()
          .setRole(replica.getRoleAsEnum())
          .setTsIdx(serverIndex);
  if (replica.getDimensionLabel() != null) {
    tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel());
  }
  replicas.add(tabletMetadataBuilder.build());
}
{code}
{{serverIndex}} can be null here, because we're using the cached tablet locations that are stale now ({{TableLocationsCache.Entry}}).
h2. Workarounds
 - Restart Spark shell
 - Wait until the connection becomes idle and cleaned up
{noformat}
DEBUG Connection: [peer master-***] handling channelInactive
20/10/26 10:37:54 DEBUG Connection: [peer master-***] cleaning up while in state READY due to: connection closed
{noformat}

 - Use kudu-spark2 1.12.0 or below

  was:
When a tablet server goes down while running a query on Spark, the connection becomes unusable due to the cached tablet locations that have become stale.
h2. Steps to reproduce
h3. Start spark-shell with kudu-spark2 1.13.0

The problem is not reproducible with kudu-spark2 1.12.0 or below, because it was introduced in [KUDU-1802 |https://github.com/apache/kudu/commit/d23ee5d38ddc4317f431dd65df0c825c00cc968a].
h3. Run a scan query
{code:scala}
import org.apache.kudu.spark.kudu._
val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> "dummy")).kudu
dummy.createOrReplaceTempView("dummy")

spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
{code}
h3. Kill a tablet server

Kill one of the tablet servers that are serving data for the query. The query should fail immediately.
{noformat}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, mo
st recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.lang.RuntimeException: org.apache.kudu.client.NonRecoverableException: Scanner *** not found (it may have expired)
{noformat}
h3. Re-run the query
{code:scala}
spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
{code}
Doesn't work, fails with an NPE.
{noformat}
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
  at org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:697)
  at org.apache.kudu.spark.kudu.KuduRDD.getPartitions(KuduRDD.scala:95)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
  at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 86 more
Caused by: java.lang.NullPointerException
  at org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:674)
  ... 117 more
{noformat}
Re-creating the DataFrame doesn't help:
{code:scala}
val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> "dummy")).kudu
dummy.createOrReplaceTempView("dummy")

// Still fails with an NPE
spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
{code}
h2. Cause
{code:java|title=KuduScanToken.java:666}
// Build the list of replica metadata.
List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new ArrayList<>();
for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) {
  Integer serverIndex = serverIndexMap.get(
      new HostAndPort(replica.getRpcHost(), replica.getRpcPort()));
  Client.TabletMetadataPB.ReplicaMetadataPB.Builder tabletMetadataBuilder =
      Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder()
          .setRole(replica.getRoleAsEnum())
          .setTsIdx(serverIndex);
  if (replica.getDimensionLabel() != null) {
    tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel());
  }
  replicas.add(tabletMetadataBuilder.build());
}
{code}
{{serverIndex}} can be null here, because we're using the cached tablet locations that are stale now ({{TableLocationsCache.Entry}}).
h2. Workarounds
 - Restart Spark shell
 - Wait until the connection becomes idle and cleaned up
{noformat}
DEBUG Connection: [peer master-***] handling channelInactive
20/10/26 10:37:54 DEBUG Connection: [peer master-***] cleaning up while in state READY due to: connection closed
{noformat}

 - Use kudu-spark2 1.12.0 or below


> NPE in KuduScanTokenBuilder#build after a tablet server goes down
> -----------------------------------------------------------------
>
>                 Key: KUDU-3205
>                 URL: https://issues.apache.org/jira/browse/KUDU-3205
>             Project: Kudu
>          Issue Type: Bug
>          Components: spark
>    Affects Versions: 1.13.0
>            Reporter: Junegunn Choi
>            Priority: Major
>
> When a tablet server goes down while running a query on Spark, the connection becomes unusable due to the cached tablet locations that have become stale.
> h2. Steps to reproduce
> h3. Start spark-shell with kudu-spark2 1.13.0
> The problem is not reproducible with kudu-spark2 1.12.0 or below, because it was introduced in [KUDU-1802 |https://github.com/apache/kudu/commit/d23ee5d38ddc4317f431dd65df0c825c00cc968a].
> h3. Run a scan query
> {code:scala}
> import org.apache.kudu.spark.kudu._
> val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> "dummy")).kudu
> dummy.createOrReplaceTempView("dummy")
> spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
> {code}
> h3. Kill a tablet server
> Kill one of the tablet servers that are serving data for the query. The query should fail immediately.
> {noformat}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.lang.RuntimeException: org.apache.kudu.client.NonRecoverableException: Scanner *** not found (it may have expired)
> {noformat}
> h3. Re-run the query
> {code:scala}
> spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
> {code}
> Doesn't work, fails with an NPE.
> {noformat}
> Caused by: java.lang.RuntimeException: java.lang.NullPointerException
>   at org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:697)
>   at org.apache.kudu.spark.kudu.KuduRDD.getPartitions(KuduRDD.scala:95)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
>   at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>   at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   ... 86 more
> Caused by: java.lang.NullPointerException
>   at org.apache.kudu.client.KuduScanToken$KuduScanTokenBuilder.build(KuduScanToken.java:674)
>   ... 117 more
> {noformat}
> Re-creating the DataFrame doesn't help:
> {code:scala}
> val dummy = spark.read.options(Map("kudu.master" -> kuduMasters, "kudu.table" -> "dummy")).kudu
> dummy.createOrReplaceTempView("dummy")
> // Still fails with an NPE
> spark.sql("select sum(id), min(val2), max(val2), count(*) from dummy").show
> {code}
> h2. Cause
> {code:java|title=KuduScanToken.java:666}
> // Build the list of replica metadata.
> List<Client.TabletMetadataPB.ReplicaMetadataPB> replicas = new ArrayList<>();
> for (LocatedTablet.Replica replica : remoteTablet.getReplicas()) {
>   Integer serverIndex = serverIndexMap.get(
>       new HostAndPort(replica.getRpcHost(), replica.getRpcPort()));
>   Client.TabletMetadataPB.ReplicaMetadataPB.Builder tabletMetadataBuilder =
>       Client.TabletMetadataPB.ReplicaMetadataPB.newBuilder()
>           .setRole(replica.getRoleAsEnum())
>           .setTsIdx(serverIndex);
>   if (replica.getDimensionLabel() != null) {
>     tabletMetadataBuilder.setDimensionLabel(replica.getDimensionLabel());
>   }
>   replicas.add(tabletMetadataBuilder.build());
> }
> {code}
> {{serverIndex}} can be null here, because we're using the cached tablet locations that are stale now ({{TableLocationsCache.Entry}}).
> h2. Workarounds
>  - Restart Spark shell
>  - Wait until the connection becomes idle and cleaned up
> {noformat}
> DEBUG Connection: [peer master-***] handling channelInactive
> 20/10/26 10:37:54 DEBUG Connection: [peer master-***] cleaning up while in state READY due to: connection closed
> {noformat}
>  - Use kudu-spark2 1.12.0 or below



--
This message was sent by Atlassian Jira
(v8.3.4#803005)