You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Krystian Zawistowski (JIRA)" <ji...@apache.org> on 2018/01/19 15:09:00 UTC

[jira] [Updated] (SPARK-23156) Code of method "initialize(I)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB

     [ https://issues.apache.org/jira/browse/SPARK-23156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Krystian Zawistowski updated SPARK-23156:
-----------------------------------------
    Description: 
I am getting this trying to generate a random DataFrame  (300 columns, 5000 rows, Ints, Floats and Timestamps in equal ratios). This is similar (but not identical) to SPARK-18492 and few tickets more that should be done in 2.1.1.

Part of the logs below. They contain hundreds of millions of lines of generated code, apparently for each of the 1500 000 fields of the dataframe which is very suspicious. 
{code:java}
18/01/19 06:33:15 INFO CodeGenerator: Code generated in 246.168393 ms$
18/01/19 06:33:21 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "initialize(I)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB$
/* 001 */ public java.lang.Object generate(Object[] references) {$
/* 002 */ return new SpecificUnsafeProjection(references);$
/* 003 */ }$
/* 004 */$
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {$
/* 006 */$
/* 007 */ private Object[] references;$
/* 008 */ private org.apache.spark.util.random.XORShiftRandom rng;$
/* 009 */ private org.apache.spark.util.random.XORShiftRandom rng1;$
/* 010 */ private org.apache.spark.util.random.XORShiftRandom rng2;$
/* 011 */ private org.apache.spark.util.random.XORShiftRandom rng3;$
/* 012 */ private org.apache.spark.util.random.XORShiftRandom rng4;$
{code}
Reproduction:
{code:java}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
class RandomData(val numberOfColumns: Int, val numberOfRows: Int) extends Serializable {
  private val minEpoch = Timestamp.valueOf("1800-01-01 00:00:00").getTime
  private val maxEpoch = Timestamp.valueOf("2200-01-01 00:00:00").getTime
  val idColumn = "id"
  import org.apache.spark.sql.functions._
  def generateFeatureLearningData(path: String): Unit = {
  val spark: SparkSession = SparkSession.builder().getOrCreate()
  materializeSourceFeatureLearningTable(spark).write.parquet(path + "/source")
  materializeTargetTable(spark).write.parquet(path + "/target")
}

def generateModelLearningData(path: String): Unit = {
  val spark: SparkSession = SparkSession.builder().getOrCreate()
  materializeTargetTable(spark).write.parquet(path + "/target")
  materializeSourceModelLearningTable(spark).write.parquet(path + "/source")
}

private def materializeSourceFeatureLearningTable(spark: SparkSession): DataFrame = {
  var sourceDF = spark.sqlContext.range(0, numberOfRows).withColumnRenamed("id", 
 idColumn)
  val columns = sourceDF(idColumn) +: (0 until numberOfColumns)
  .flatMap(x => Seq(getTimeColumn(x), getNumberColumn(x), getCategoryColumn(x)))
sourceDF.select(columns: _*)
}

private def getTimeColumn(seed: Int): Column = {
  val uniqueSeed = seed + numberOfColumns * 3
  rand(seed = uniqueSeed).multiply(maxEpoch - 
minEpoch).divide(1000).cast("long").plus(minEpoch / 1000).cast(TimestampType).alias(s"time$seed")
}
private def getNumberColumn(seed: Int, namePrefix: String = "number"): Column = {
  val uniqueSeed = seed + numberOfColumns * 4
  randn(seed = uniqueSeed).alias(s"$namePrefix$seed")
}
private def getCategoryColumn(seed: Int): Column = {
  val uniqueSeed = seed + numberOfColumns * 4
  rand(seed = uniqueSeed).multiply(100).cast("int").alias(s"category$seed")
}

}

object GenerateData{
def main(args: Array[String]): Unit = {
  new RandomData(args(0).toInt, args(1).toInt).generateFeatureLearningData(args(2))
}
}

{code}
Please package a jar and run as follows:
{code:java}
spark-submit --master yarn \
 --driver-memory 12g \
 --executor-memory 12g \
 --deploy-mode cluster \
 --class GenerateData \
 --master yarn \
 100 5000 "hdfs:///tmp/parquet"
{code}

  was:
I am getting this trying to generate a random DataFrame  (300 columns, 5000 rows, Ints, Floats and Timestamps in equal ratios). This is similar (but not identical) to SPARK-18492 and few tickets more that should be done in 2.1.1.

Part of the logs below. They contain hundreds of millions of lines of generated code, apparently for each of the 1500 000 fields of the dataframe which is very suspicious. 
{code:java}
18/01/19 06:33:15 INFO CodeGenerator: Code generated in 246.168393 ms$
18/01/19 06:33:21 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "initialize(I)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB$
/* 001 */ public java.lang.Object generate(Object[] references) {$
/* 002 */ return new SpecificUnsafeProjection(references);$
/* 003 */ }$
/* 004 */$
/* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {$
/* 006 */$
/* 007 */ private Object[] references;$
/* 008 */ private org.apache.spark.util.random.XORShiftRandom rng;$
/* 009 */ private org.apache.spark.util.random.XORShiftRandom rng1;$
/* 010 */ private org.apache.spark.util.random.XORShiftRandom rng2;$
/* 011 */ private org.apache.spark.util.random.XORShiftRandom rng3;$
/* 012 */ private org.apache.spark.util.random.XORShiftRandom rng4;$
{code}
Reproduction:
{code:java}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
class RandomData(val numberOfColumns: Int, val numberOfRows: Int) extends Serializable {


private val minEpoch = Timestamp.valueOf("1800-01-01 00:00:00").getTime
private val maxEpoch = Timestamp.valueOf("2200-01-01 00:00:00").getTime
val idColumn = "id"
import org.apache.spark.sql.functions._
def generateFeatureLearningData(path: String): Unit = {
val spark: SparkSession = SparkSession.builder().getOrCreate()
materializeSourceFeatureLearningTable(spark).write.parquet(path + "/source")
materializeTargetTable(spark).write.parquet(path + "/target")
}

def generateModelLearningData(path: String): Unit = {
val spark: SparkSession = SparkSession.builder().getOrCreate()
materializeTargetTable(spark).write.parquet(path + "/target")
materializeSourceModelLearningTable(spark).write.parquet(path + "/source")
}

private def materializeSourceFeatureLearningTable(spark: SparkSession): DataFrame = {
var sourceDF = spark.sqlContext.range(0, numberOfRows).withColumnRenamed("id", idColumn)
val columns = sourceDF(idColumn) +: (0 until numberOfColumns)
.flatMap(x => Seq(getTimeColumn(x), getNumberColumn(x), getCategoryColumn(x)))
sourceDF.select(columns: _*)
}

private def getTimeColumn(seed: Int): Column = {
val uniqueSeed = seed + numberOfColumns * 3
rand(seed = uniqueSeed).multiply(maxEpoch - minEpoch).divide(1000).cast("long").plus(minEpoch / 1000).cast(TimestampType).alias(s"time$seed")
}
private def getNumberColumn(seed: Int, namePrefix: String = "number"): Column = {
val uniqueSeed = seed + numberOfColumns * 4
randn(seed = uniqueSeed).alias(s"$namePrefix$seed")
}
private def getCategoryColumn(seed: Int): Column = {
val uniqueSeed = seed + numberOfColumns * 4
rand(seed = uniqueSeed).multiply(100).cast("int").alias(s"category$seed")
}

}

object GenerateData{
def main(args: Array[String]): Unit = {
new RandomData(args(0).toInt, args(1).toInt).generateFeatureLearningData(args(2))
}
}

{code}
Please package a jar and run as follows:
{code}
spark-submit --master yarn --driver-memory 12g  --executor-memory 12g --deploy-mode cluster --class GenerateData --master yarn  100 5000 "hdfs:///tmp/parquet"
{code}


>  Code of method "initialize(I)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
> ------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23156
>                 URL: https://issues.apache.org/jira/browse/SPARK-23156
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Submit, SQL
>    Affects Versions: 2.1.1, 2.1.2
>         Environment: Ubuntu 16.04, Scala 2.11, Java 8, 8-node YARN cluster.
>            Reporter: Krystian Zawistowski
>            Priority: Major
>
> I am getting this trying to generate a random DataFrame  (300 columns, 5000 rows, Ints, Floats and Timestamps in equal ratios). This is similar (but not identical) to SPARK-18492 and few tickets more that should be done in 2.1.1.
> Part of the logs below. They contain hundreds of millions of lines of generated code, apparently for each of the 1500 000 fields of the dataframe which is very suspicious. 
> {code:java}
> 18/01/19 06:33:15 INFO CodeGenerator: Code generated in 246.168393 ms$
> 18/01/19 06:33:21 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "initialize(I)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB$
> /* 001 */ public java.lang.Object generate(Object[] references) {$
> /* 002 */ return new SpecificUnsafeProjection(references);$
> /* 003 */ }$
> /* 004 */$
> /* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {$
> /* 006 */$
> /* 007 */ private Object[] references;$
> /* 008 */ private org.apache.spark.util.random.XORShiftRandom rng;$
> /* 009 */ private org.apache.spark.util.random.XORShiftRandom rng1;$
> /* 010 */ private org.apache.spark.util.random.XORShiftRandom rng2;$
> /* 011 */ private org.apache.spark.util.random.XORShiftRandom rng3;$
> /* 012 */ private org.apache.spark.util.random.XORShiftRandom rng4;$
> {code}
> Reproduction:
> {code:java}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> class RandomData(val numberOfColumns: Int, val numberOfRows: Int) extends Serializable {
>   private val minEpoch = Timestamp.valueOf("1800-01-01 00:00:00").getTime
>   private val maxEpoch = Timestamp.valueOf("2200-01-01 00:00:00").getTime
>   val idColumn = "id"
>   import org.apache.spark.sql.functions._
>   def generateFeatureLearningData(path: String): Unit = {
>   val spark: SparkSession = SparkSession.builder().getOrCreate()
>   materializeSourceFeatureLearningTable(spark).write.parquet(path + "/source")
>   materializeTargetTable(spark).write.parquet(path + "/target")
> }
> def generateModelLearningData(path: String): Unit = {
>   val spark: SparkSession = SparkSession.builder().getOrCreate()
>   materializeTargetTable(spark).write.parquet(path + "/target")
>   materializeSourceModelLearningTable(spark).write.parquet(path + "/source")
> }
> private def materializeSourceFeatureLearningTable(spark: SparkSession): DataFrame = {
>   var sourceDF = spark.sqlContext.range(0, numberOfRows).withColumnRenamed("id", 
>  idColumn)
>   val columns = sourceDF(idColumn) +: (0 until numberOfColumns)
>   .flatMap(x => Seq(getTimeColumn(x), getNumberColumn(x), getCategoryColumn(x)))
> sourceDF.select(columns: _*)
> }
> private def getTimeColumn(seed: Int): Column = {
>   val uniqueSeed = seed + numberOfColumns * 3
>   rand(seed = uniqueSeed).multiply(maxEpoch - 
> minEpoch).divide(1000).cast("long").plus(minEpoch / 1000).cast(TimestampType).alias(s"time$seed")
> }
> private def getNumberColumn(seed: Int, namePrefix: String = "number"): Column = {
>   val uniqueSeed = seed + numberOfColumns * 4
>   randn(seed = uniqueSeed).alias(s"$namePrefix$seed")
> }
> private def getCategoryColumn(seed: Int): Column = {
>   val uniqueSeed = seed + numberOfColumns * 4
>   rand(seed = uniqueSeed).multiply(100).cast("int").alias(s"category$seed")
> }
> }
> object GenerateData{
> def main(args: Array[String]): Unit = {
>   new RandomData(args(0).toInt, args(1).toInt).generateFeatureLearningData(args(2))
> }
> }
> {code}
> Please package a jar and run as follows:
> {code:java}
> spark-submit --master yarn \
>  --driver-memory 12g \
>  --executor-memory 12g \
>  --deploy-mode cluster \
>  --class GenerateData \
>  --master yarn \
>  100 5000 "hdfs:///tmp/parquet"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org