You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by elevy <el...@gmail.com> on 2017/03/18 08:13:27 UTC

[Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB

Hello all, 
I am using the Spark 2.1.0 release,
I am trying to load BigTable CSV file with more than 1500 columns into our
system 

Our flow of doing that is:

•   First, read the data as an RDD <BR>
•   generate continuing record id using the zipWithIndex()
    (this operation exist only in RDD API, 
     in the Dataset there is similar option which is
monotonically_increasing_id() 
     but this method work in partitioning and create id which is not
sequentially – and it is not what we need ☹)
•   converting the RDD to Dataset using the createDataFrame() of
sparkSession 
•   this last operation generate code that exceeded the JVM object size
limitation of 64KB 

I search the web for some solution or even similar Use Case, 
found few issues that talked about the 64KB error but all of the cases was
dealing with 100 column and solved in Spark 2.1.0 version by shrinking the
generated code, 
but none of them reach the JVM limitation 
 
*Any Idea from this forum of expert will be very appreciated *
there could be 2 type of solution we are looking for:
*1.* How should I overcome the size of the code generation 
*OR* 
*2.* How can I generate sequential ID directly on the Dataset

Our Temporal Solution:

•   reading the DS as RDD
•   generate sequential id 
•   write the new data as text file 
•   reading the data as Dataset
this solution cause us 30% of performance degradation :(

*Code That reproduced the issue *

/import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import poc.commons.SparkSessionInitializer;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

public class RDDConverter {
    private static final int FIELD_COUNT = 1900;

    private Dataset<Row> createBigSchema(SparkSession sparkSession , int
startColName, int fieldNumber) {
        JavaSparkContext jsc = new
JavaSparkContext(sparkSession.sparkContext());
        SQLContext sqlContext = new SQLContext(sparkSession.sparkContext());

        String[] row = IntStream.range(startColName,
fieldNumber).mapToObj(String::valueOf).toArray(String[]::new);
        List<String[]> data = Collections.singletonList(row);
        JavaRDD<Row> rdd = jsc.parallelize(data).map(RowFactory::create);

        StructField[] structFields = IntStream.range(startColName,
fieldNumber)
                .mapToObj(i -> new StructField(String.valueOf(i),
DataTypes.StringType, true, Metadata.empty()))
                .toArray(StructField[]::new);
        StructType schema = DataTypes.createStructType(structFields);

        Dataset<Row> dataSet = sqlContext.createDataFrame(rdd, schema);
        dataSet.show();
        return dataSet;
    }

    public static void main(String[] args) {
        SparkSessionInitializer sparkSessionInitializer = new
SparkSessionInitializer();
        SparkSession sparkSession = sparkSessionInitializer.init();

        RDDConverter rddConverter = new RDDConverter();
        rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT);
    }
}/

The Exception we are getting :

org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 39 common frames omitted
*Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V"
of class
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
grows beyond 64 KB*
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Core-RDD-to-Dataset-1500-columns-data-with-createDataFrame-throw-exception-of-grows-beyondB-tp28509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: [Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB

Posted by Eyal Zituny <ey...@equalum.io>.
Hi Eyal,

You can also try to call repartition(1) before calling the
"monotonically_increasing_id" function , it will probably have some
performance degradation (depends on the size of the files), but might be
not as bad as the other workaround
after adding the ID you can repartition again in order to get better
parallelism

Example:

spark
 .read
  .csv("data")
  .repartition(1)
  .withColumn("rowid", monotonically_increasing_id())
  .repartition(20)
  .write.csv("output")

Regards
Eyal Zituny

On Sat, Mar 18, 2017 at 11:58 AM, Kazuaki Ishizaki <IS...@jp.ibm.com>
wrote:

> Hi
> There is the latest status for code generation.
>
> When we use the master that will be Spark 2.2, the following exception
> occurs. The latest version fixed 64KB errors in this case. However, we meet
> another JVM limit, the number of the constant pool entry.
>
> Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for
> class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection
> has grown past JVM limit of 0xFFFF
>         at org.codehaus.janino.util.ClassFile.addToConstantPool(
> ClassFile.java:499)
>         at org.codehaus.janino.util.ClassFile.addConstantNameAndTypeInfo(
> ClassFile.java:439)
>         at org.codehaus.janino.util.ClassFile.addConstantMethodrefInfo(
> ClassFile.java:358)
> ...
>
> While this PR https://github.com/apache/spark/pull/16648addresses the
> number of the constant pool issue, it has not been merged yet.
>
> Regards,
> Kazuaki Ishizaki
>
>
>
> From:        elevy <el...@gmail.com>
> To:        user@spark.apache.org
> Date:        2017/03/18 17:14
> Subject:        [Spark SQL & Core]: RDD to Dataset 1500 columns data with
> createDataFrame() throw exception of grows beyond 64 KB
> ------------------------------
>
>
>
> Hello all,
> I am using the Spark 2.1.0 release,
> I am trying to load BigTable CSV file with more than 1500 columns into our
> system
>
> Our flow of doing that is:
>
> •   First, read the data as an RDD <BR>
> •   generate continuing record id using the zipWithIndex()
>    (this operation exist only in RDD API,
>     in the Dataset there is similar option which is
> monotonically_increasing_id()
>     but this method work in partitioning and create id which is not
> sequentially – and it is not what we need ☹)
> •   converting the RDD to Dataset using the createDataFrame() of
> sparkSession
> •   this last operation generate code that exceeded the JVM object size
> limitation of 64KB
>
> I search the web for some solution or even similar Use Case,
> found few issues that talked about the 64KB error but all of the cases was
> dealing with 100 column and solved in Spark 2.1.0 version by shrinking the
> generated code,
> but none of them reach the JVM limitation
>
> *Any Idea from this forum of expert will be very appreciated *
> there could be 2 type of solution we are looking for:
> *1.* How should I overcome the size of the code generation
> *OR*
> *2.* How can I generate sequential ID directly on the Dataset
>
> Our Temporal Solution:
>
> •   reading the DS as RDD
> •   generate sequential id
> •   write the new data as text file
> •   reading the data as Dataset
> this solution cause us 30% of performance degradation :(
>
> *Code That reproduced the issue *
>
> /import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.sql.*;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.Metadata;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> import poc.commons.SparkSessionInitializer;
>
> import java.util.Arrays;
> import java.util.Collections;
> import java.util.List;
> import java.util.stream.IntStream;
>
> public class RDDConverter {
>    private static final int FIELD_COUNT = 1900;
>
>    private Dataset<Row> createBigSchema(SparkSession sparkSession , int
> startColName, int fieldNumber) {
>        JavaSparkContext jsc = new
> JavaSparkContext(sparkSession.sparkContext());
>        SQLContext sqlContext = new SQLContext(sparkSession.
> sparkContext());
>
>        String[] row = IntStream.range(startColName,
> fieldNumber).mapToObj(String::valueOf).toArray(String[]::new);
>        List<String[]> data = Collections.singletonList(row);
>        JavaRDD<Row> rdd = jsc.parallelize(data).map(RowFactory::create);
>
>        StructField[] structFields = IntStream.range(startColName,
> fieldNumber)
>                .mapToObj(i -> new StructField(String.valueOf(i),
> DataTypes.StringType, true, Metadata.empty()))
>                .toArray(StructField[]::new);
>        StructType schema = DataTypes.createStructType(structFields);
>
>        Dataset<Row> dataSet = sqlContext.createDataFrame(rdd, schema);
>        dataSet.show();
>        return dataSet;
>    }
>
>    public static void main(String[] args) {
>        SparkSessionInitializer sparkSessionInitializer = new
> SparkSessionInitializer();
>        SparkSession sparkSession = sparkSessionInitializer.init();
>
>        RDDConverter rddConverter = new RDDConverter();
>        rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT);
>    }
> }/
>
> The Exception we are getting :
>
> org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$
> CodeGenerator$$doCompile(CodeGenerator.scala:893)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
> at
> org.apache.spark.sql.catalyst.expressions.codegen.
> CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
> at
> org.spark_project.guava.cache.LocalCache$LoadingValueReference.
> loadFuture(LocalCache.java:3599)
> at
> org.spark_project.guava.cache.LocalCache$Segment.loadSync(
> LocalCache.java:2379)
> ... 39 common frames omitted
> *Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
> "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/
> Object;)V"
> of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection"
> grows beyond 64 KB*
> at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-SQL-Core-RDD-to-Dataset-1500-
> columns-data-with-createDataFrame-throw-exception-of-grows-beyondB-
> tp28509.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>
>
>

Re: [Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB

Posted by Kazuaki Ishizaki <IS...@jp.ibm.com>.
Hi
There is the latest status for code generation.

When we use the master that will be Spark 2.2, the following exception 
occurs. The latest version fixed 64KB errors in this case. However, we 
meet another JVM limit, the number of the constant pool entry.

Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for 
class 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection 
has grown past JVM limit of 0xFFFF
        at 
org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499)
        at 
org.codehaus.janino.util.ClassFile.addConstantNameAndTypeInfo(ClassFile.java:439)
        at 
org.codehaus.janino.util.ClassFile.addConstantMethodrefInfo(ClassFile.java:358)
...

While this PR https://github.com/apache/spark/pull/16648 addresses the 
number of the constant pool issue, it has not been merged yet.

Regards,
Kazuaki Ishizaki



From:   elevy <el...@gmail.com>
To:     user@spark.apache.org
Date:   2017/03/18 17:14
Subject:        [Spark SQL & Core]: RDD to Dataset 1500 columns data with 
createDataFrame() throw exception of grows beyond 64 KB



Hello all, 
I am using the Spark 2.1.0 release,
I am trying to load BigTable CSV file with more than 1500 columns into our
system 

Our flow of doing that is:

•   First, read the data as an RDD <BR>
•   generate continuing record id using the zipWithIndex()
    (this operation exist only in RDD API, 
     in the Dataset there is similar option which is
monotonically_increasing_id() 
     but this method work in partitioning and create id which is not
sequentially – and it is not what we need ☹)
•   converting the RDD to Dataset using the createDataFrame() of
sparkSession 
•   this last operation generate code that exceeded the JVM object size
limitation of 64KB 

I search the web for some solution or even similar Use Case, 
found few issues that talked about the 64KB error but all of the cases was
dealing with 100 column and solved in Spark 2.1.0 version by shrinking the
generated code, 
but none of them reach the JVM limitation 
 
*Any Idea from this forum of expert will be very appreciated *
there could be 2 type of solution we are looking for:
*1.* How should I overcome the size of the code generation 
*OR* 
*2.* How can I generate sequential ID directly on the Dataset

Our Temporal Solution:

•   reading the DS as RDD
•   generate sequential id 
•   write the new data as text file 
•   reading the data as Dataset
this solution cause us 30% of performance degradation :(

*Code That reproduced the issue *

/import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import poc.commons.SparkSessionInitializer;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

public class RDDConverter {
    private static final int FIELD_COUNT = 1900;

    private Dataset<Row> createBigSchema(SparkSession sparkSession , int
startColName, int fieldNumber) {
        JavaSparkContext jsc = new
JavaSparkContext(sparkSession.sparkContext());
        SQLContext sqlContext = new 
SQLContext(sparkSession.sparkContext());

        String[] row = IntStream.range(startColName,
fieldNumber).mapToObj(String::valueOf).toArray(String[]::new);
        List<String[]> data = Collections.singletonList(row);
        JavaRDD<Row> rdd = jsc.parallelize(data).map(RowFactory::create);

        StructField[] structFields = IntStream.range(startColName,
fieldNumber)
                .mapToObj(i -> new StructField(String.valueOf(i),
DataTypes.StringType, true, Metadata.empty()))
                .toArray(StructField[]::new);
        StructType schema = DataTypes.createStructType(structFields);

        Dataset<Row> dataSet = sqlContext.createDataFrame(rdd, schema);
        dataSet.show();
        return dataSet;
    }

    public static void main(String[] args) {
        SparkSessionInitializer sparkSessionInitializer = new
SparkSessionInitializer();
        SparkSession sparkSession = sparkSessionInitializer.init();

        RDDConverter rddConverter = new RDDConverter();
        rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT);
    }
}/

The Exception we are getting :

org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 39 common frames omitted
*Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V"
of class
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
grows beyond 64 KB*
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Core-RDD-to-Dataset-1500-columns-data-with-createDataFrame-throw-exception-of-grows-beyondB-tp28509.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org