You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "liang.feng (Jira)" <ji...@apache.org> on 2023/11/13 07:43:00 UTC
[jira] [Updated] (SPARK-45903) Different column orders lead to OOM
[ https://issues.apache.org/jira/browse/SPARK-45903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liang.feng updated SPARK-45903:
-------------------------------
Description:
Problem description:
I am using spark to generate HFile, OOM occurs in the repartitionAndSortWithinPartitions stage, and the input data is the result of sql . The logic is as follows
{code:java}
// code placeholder
val family = Bytes.toBytes("cf")
// colum "a" and "b" are longType,column "c" is a json string
val sql = "select a, b , c from table "
spark.sql(sql).flatMap(row => {
val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, ImmutableBytesWritable]]
val v1 = row.getLong(0)
val v2 = row.getLong(1)
val v3 = row.getString(2)
val rowkey = Bytes.toBytes(v1)
val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))
val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))
val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))
buffer.toArray
})
.repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
.map(tuple => {
val kfq = tuple._1
val value = tuple._2
val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), kfq.getQualifier(), value.get())
(rowkey, keyValue)
})
.saveAsNewAPIHadoopFile(
tmpPath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
job.getConfiguration()
)
{code}
was:
Problem description:
I am using spark to generate HFile, OOM occurs in the repartitionAndSortWithinPartitions stage, and the input data is the result of sql . The logic is as follows
{code:java}
// code placeholder
val family = Bytes.toBytes("cf") // 字段a和b都是long类型,c是json字符串
val sql = "select a, b , c from table "
spark.sql(sql).flatMap(row => {
val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, ImmutableBytesWritable]]
val v1 = row.getLong(0)
val v2 = row.getLong(1)
val v3 = row.getString(2) val rowkey = Bytes.toBytes(v1)
val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1))) val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2))) val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3))) buffer.toArray
})
.repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
.map(tuple => {
val kfq = tuple._1
val value = tuple._2
val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), kfq.getQualifier(), value.get())
(rowkey, keyValue)
})
.saveAsNewAPIHadoopFile(
tmpPath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
job.getConfiguration()
) {code}
> Different column orders lead to OOM
> -----------------------------------
>
> Key: SPARK-45903
> URL: https://issues.apache.org/jira/browse/SPARK-45903
> Project: Spark
> Issue Type: Question
> Components: Spark Core, SQL
> Affects Versions: 2.3.2, 2.4.1, 3.0.1
> Reporter: liang.feng
> Priority: Major
>
>
> Problem description:
> I am using spark to generate HFile, OOM occurs in the repartitionAndSortWithinPartitions stage, and the input data is the result of sql . The logic is as follows
> {code:java}
> // code placeholder
> val family = Bytes.toBytes("cf")
> // colum "a" and "b" are longType,column "c" is a json string
> val sql = "select a, b , c from table "
> spark.sql(sql).flatMap(row => {
> val buffer = new ArrayBuffer[Tuple2[KeyFamilyQualifier, ImmutableBytesWritable]]
> val v1 = row.getLong(0)
> val v2 = row.getLong(1)
> val v3 = row.getString(2)
> val rowkey = Bytes.toBytes(v1)
>
> val kfq1 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v1"))
> buffer += Tuple2(kfq1, new ImmutableBytesWritable(Bytes.toBytes(v1)))
>
> val kfq2 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v2"))
> buffer += Tuple2(kfq2, new ImmutableBytesWritable(Bytes.toBytes(v2)))
> val kfq3 = new KeyFamilyQualifier(rowkey, family, Bytes.toBytes("v3"))
> buffer += Tuple2(kfq3, new ImmutableBytesWritable(Bytes.toBytes(v3)))
> buffer.toArray
> })
> .repartitionAndSortWithinPartitions(new BulkLoadPartitioner(startKeys))
> .map(tuple => {
> val kfq = tuple._1
> val value = tuple._2
>
> val rowkey = new ImmutableBytesWritable(kfq.getRowKey)
> val keyValue = new KeyValue(kfq.getRowKey(), kfq.getFamily(), kfq.getQualifier(), value.get())
> (rowkey, keyValue)
> })
> .saveAsNewAPIHadoopFile(
> tmpPath,
> classOf[ImmutableBytesWritable],
> classOf[KeyValue],
> classOf[HFileOutputFormat2],
> job.getConfiguration()
> )
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org