You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiao Li (JIRA)" <ji...@apache.org> on 2018/01/29 06:24:00 UTC

[jira] [Updated] (SPARK-20254) SPARK-19716 generates unnecessary data conversion for Dataset with primitive array

     [ https://issues.apache.org/jira/browse/SPARK-20254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xiao Li updated SPARK-20254:
----------------------------
    Fix Version/s:     (was: 2.3.0)

> SPARK-19716 generates unnecessary data conversion for Dataset with primitive array
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-20254
>                 URL: https://issues.apache.org/jira/browse/SPARK-20254
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Kazuaki Ishizaki
>            Assignee: Kazuaki Ishizaki
>            Priority: Major
>             Fix For: 2.2.0
>
>
> Since {{unresolvedmapobjects}} is newly introduced by SPARK-19716, the current implementation generates {{mapobjects()}} at {{DeserializeToObject}} in {{Analyzed Logical Plan}}. This {{mapObject()}} introduces Java code to store an array into {{GenericArrayData}}.
> cc: [~cloud_fan]
>  
> {code}
> val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
> ds.count
> val ds2 = ds.map(e => e)
> ds2.explain(true)
> ds2.show
> {code}
> Plans before SPARK-19716
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: "scala.Array").toDoubleArray), obj#23: [D
>       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>          +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array<double>
> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- DeserializeToObject cast(value#2 as array<double>).toDoubleArray, obj#23: [D
>       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>          +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- DeserializeToObject value#2.toDoubleArray, obj#23: [D
>       +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>             +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>                +- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- *MapElements <function1>, obj#24: [D
>    +- *DeserializeToObject value#2.toDoubleArray, obj#23: [D
>       +- *InMemoryTableScan [value#2]
>             +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>                   +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>                      +- Scan ExternalRDDScan[obj#1]
> {code}
> Plans after SPARK-19716
> {code}
> == Parsed Logical Plan ==
> 'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- 'DeserializeToObject unresolveddeserializer(unresolvedmapobjects(<function1>, getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D
>       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>          +- ExternalRDD [obj#1]
> == Analyzed Logical Plan ==
> value: array<double>
> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: [D
>       +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>          +- ExternalRDD [obj#1]
> == Optimized Logical Plan ==
> SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
>    +- DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: [D
>       +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>             +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>                +- Scan ExternalRDDScan[obj#1]
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
> +- *MapElements <function1>, obj#24: [D
>    +- *DeserializeToObject mapobjects(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue4, MapObjects_loopIsNull4, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue4).toDoubleArray, obj#23: [D
>       +- InMemoryTableScan [value#2]
>             +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>                   +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
>                      +- Scan ExternalRDDScan[obj#1]
> {{java}}
> {{java}}
> ...
> /* 056 */       ArrayData deserializetoobject_value1 = null;
> /* 057 */
> /* 058 */       if (!inputadapter_isNull) {
> /* 059 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
> /* 060 */
> /* 061 */         Double[] deserializetoobject_convertedArray = null;
> /* 062 */         deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
> /* 063 */
> /* 064 */         int deserializetoobject_loopIndex = 0;
> /* 065 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
> /* 066 */           MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
> /* 067 */           MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
> /* 068 */
> /* 069 */           if (MapObjects_loopIsNull2) {
> /* 070 */             throw new RuntimeException(((java.lang.String) references[0]));
> /* 071 */           }
> /* 072 */           if (false) {
> /* 073 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
> /* 074 */           } else {
> /* 075 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2;
> /* 076 */           }
> /* 077 */
> /* 078 */           deserializetoobject_loopIndex += 1;
> /* 079 */         }
> /* 080 */
> /* 081 */         deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/
> /* 082 */       }
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org