You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Morten Hornbech (JIRA)" <ji...@apache.org> on 2016/11/09 12:23:58 UTC

[jira] [Commented] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

    [ https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650788#comment-15650788 ] 

Morten Hornbech commented on SPARK-16087:
-----------------------------------------

I can reproduce this issue on Spark 2.0.1 using cassandra connector. Double unions trigger a hang on collect - single unions are ok. Test below:

import com.datastax.driver.core.Cluster
import com.datastax.spark.connector._
import com.websudos.phantom.connectors.KeySpaceDef
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.FunSuite

case class TestData(id: String)

class ReproTest extends FunSuite {

  test("error reproduction scenario") {

    // SETUP: Build key space and tables.
    val keySpaceName = "test"
    val tableA = "test_table_a"
    val tableB = "test_table_b"
    val tableC = "test_table_c"
    val builder = Cluster.builder().addContactPoint("127.0.0.1").withPort(9142)
    val keySpaceDef = new KeySpaceDef(
      keySpaceName,
      _ => builder,
      true,
      Some((ses, ks) => s"CREATE KEYSPACE IF NOT EXISTS $keySpaceName WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"))

    keySpaceDef.session.execute(s"DROP TABLE IF EXISTS ${keySpaceName}.${tableA}")
    keySpaceDef.session.execute(s"DROP TABLE IF EXISTS ${keySpaceName}.${tableB}")
    keySpaceDef.session.execute(s"DROP TABLE IF EXISTS ${keySpaceName}.${tableC}")

    // SETUP: Create spark session.
    val config = new SparkConf()
      .setMaster("local[*]")
      .set("spark.cassandra.connection.port", "9142")
      .setAppName("test")
      .set("spark.cassandra.connection.host", "127.0.0.1")
      .set("spark.sql.warehouse.dir", "file:///C:/temp")

    val session = SparkSession.builder().config(config).getOrCreate()

    // SETUP: Create and persist data.
    val data = List(TestData("Foo"))
    val frame = session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))

    frame.createCassandraTable(keySpaceName, tableA)
    frame.createCassandraTable(keySpaceName, tableB)
    frame.createCassandraTable(keySpaceName, tableC)

    frame
      .write
      .format("org.apache.spark.sql.cassandra")
      .mode(SaveMode.Append)
      .options(Map("table" -> tableA, "keyspace" -> keySpaceName))
      .save()

    frame
      .write
      .format("org.apache.spark.sql.cassandra")
      .mode(SaveMode.Append)
      .options(Map("table" -> tableB, "keyspace" -> keySpaceName))
      .save()

    frame
      .write
      .format("org.apache.spark.sql.cassandra")
      .mode(SaveMode.Append)
      .options(Map("table" -> tableC, "keyspace" -> keySpaceName))
      .save()

    // TEST: Load and transform frames:
    val loadedA = session.sqlContext
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> tableA, "keyspace" -> keySpaceName))
      .load()
      .createOrReplaceTempView("A")

    val loadedB = session.sqlContext
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> tableB, "keyspace" -> keySpaceName))
      .load()
      .createOrReplaceTempView("B")

    val loadedC = session.sqlContext
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> tableC, "keyspace" -> keySpaceName))
      .load()
      .createOrReplaceTempView("C")

    val rowsOK = session.sql("select id from A union select id from B").collect()
    val rowsHang = session.sql("select id from A union select id from B union select id from C").collect()
  }
}



> Spark Hangs When Using Union With Persisted Hadoop RDD
> ------------------------------------------------------
>
>                 Key: SPARK-16087
>                 URL: https://issues.apache.org/jira/browse/SPARK-16087
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.4.1, 1.6.1
>            Reporter: Kevin Conaway
>            Priority: Critical
>         Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, part-00000, part-00001, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
>     public static void main(String [] args) throws Exception {
>         JavaSparkContext sc = new JavaSparkContext(
>             new SparkConf()
>                 .set("spark.serializer", KryoSerializer.class.getName())
>                 .set("spark.master", "local[*]")
>                 .setAppName(SparkBug.class.getName())
>         );
>         JavaPairRDD<LongWritable, BytesWritable> rdd1 = sc.sequenceFile(
>            "hdfs://localhost:9000/part-00000",
>             LongWritable.class,
>             BytesWritable.class
>         ).mapToPair(new PairFunction<Tuple2<LongWritable, BytesWritable>, LongWritable, BytesWritable>() {
>             @Override
>             public Tuple2<LongWritable, BytesWritable> call(Tuple2<LongWritable, BytesWritable> tuple) throws Exception {
>                 return new Tuple2<>(
>                     new LongWritable(tuple._1.get()),
>                     new BytesWritable(tuple._2.copyBytes())
>                 );
>             }
>         }).persist(
>             StorageLevel.MEMORY_ONLY()
>         );
>         System.out.println("Before union: " + rdd1.count());
>         JavaPairRDD<LongWritable, BytesWritable> rdd2 = sc.sequenceFile(
>             "hdfs://localhost:9000/part-00001",
>             LongWritable.class,
>             BytesWritable.class
>         );
>         JavaPairRDD<LongWritable, BytesWritable> joined = rdd1.union(rdd2);
>         System.out.println("After union: " + joined.count());
>     }
> }
> {code}
> You'll need to upload the attached part-00000 and part-00001 to a local hdfs instance (I'm just using a dummy [Single Node Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html] locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org