You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "liang.feng (Jira)" <ji...@apache.org> on 2023/11/13 07:43:00 UTC

[jira] [Updated] (SPARK-45903) Different column orders lead to OOM

     [ https://issues.apache.org/jira/browse/SPARK-45903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

liang.feng updated SPARK-45903:
-------------------------------
    Description: 
 
Problem description:
 I am using spark to generate HFile, OOM occurs in the repartitionAndSortWithinPartitions stage, and the input data is the result of sql . The logic is as follows
{code:java}
// code placeholder

  val family = Bytes.toBytes("cf")  
  // colum "a" and "b" are longType,column "c" is a json string
  val sql = "select a, b , c from table "
  spark.sql(sql).flatMap(row => {
    val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, ImmutableBytesWritable]]
    val v1 = row.getLong(0)
    val v2 = row.getLong(1)
    val v3 = row.getString(2)    

    val rowkey = Bytes.toBytes(v1)
    
    val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
    buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))  
   
    val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
    buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))    

    val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
    buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))    

    buffer.toArray
  })
  .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
  .map(tuple => {
    val kfq = tuple._1
    val value = tuple._2
    
    val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
    val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), kfq.getQualifier(), value.get())
    (rowkey, keyValue)
  })
  .saveAsNewAPIHadoopFile(
    tmpPath, 
    classOf[ImmutableBytesWritable],
    classOf[KeyValue],
    classOf[HFileOutputFormat2],
    job.getConfiguration()
  ) 


{code}
 

  was:
 
Problem description:
 I am using spark to generate HFile, OOM occurs in the repartitionAndSortWithinPartitions stage, and the input data is the result of sql . The logic is as follows
{code:java}
// code placeholder

val family = Bytes.toBytes("cf")  // 字段a和b都是long类型,c是json字符串
  val sql = "select a, b , c from table "
  spark.sql(sql).flatMap(row => {
    val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, ImmutableBytesWritable]]
    val v1 = row.getLong(0)
    val v2 = row.getLong(1)
    val v3 = row.getString(2)    val rowkey = Bytes.toBytes(v1)
    
    val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
    buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))    val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
    buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))    val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
    buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))    buffer.toArray
  })
  .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
  .map(tuple => {
    val kfq = tuple._1
    val value = tuple._2
    
    val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
    val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), kfq.getQualifier(), value.get())
    (rowkey, keyValue)
  })
  .saveAsNewAPIHadoopFile(
    tmpPath, 
    classOf[ImmutableBytesWritable],
    classOf[KeyValue],
    classOf[HFileOutputFormat2],
    job.getConfiguration()
  ) {code}
 


> Different column orders lead to OOM
> -----------------------------------
>
>                 Key: SPARK-45903
>                 URL: https://issues.apache.org/jira/browse/SPARK-45903
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.2, 2.4.1, 3.0.1
>            Reporter: liang.feng
>            Priority: Major
>
>  
> Problem description:
>  I am using spark to generate HFile, OOM occurs in the repartitionAndSortWithinPartitions stage, and the input data is the result of sql . The logic is as follows
> {code:java}
> // code placeholder
>   val family = Bytes.toBytes("cf")  
>   // colum "a" and "b" are longType,column "c" is a json string
>   val sql = "select a, b , c from table "
>   spark.sql(sql).flatMap(row => {
>     val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, ImmutableBytesWritable]]
>     val v1 = row.getLong(0)
>     val v2 = row.getLong(1)
>     val v3 = row.getString(2)    
>     val rowkey = Bytes.toBytes(v1)
>     
>     val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
>     buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))  
>    
>     val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
>     buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))    
>     val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
>     buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))    
>     buffer.toArray
>   })
>   .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
>   .map(tuple => {
>     val kfq = tuple._1
>     val value = tuple._2
>     
>     val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
>     val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), kfq.getQualifier(), value.get())
>     (rowkey, keyValue)
>   })
>   .saveAsNewAPIHadoopFile(
>     tmpPath, 
>     classOf[ImmutableBytesWritable],
>     classOf[KeyValue],
>     classOf[HFileOutputFormat2],
>     job.getConfiguration()
>   ) 
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org