You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Richard Xin <ri...@yahoo.com.INVALID> on 2017/01/27 20:15:19 UTC

Re: Issue creating row with java.util.Map type

try
Row newRow = RowFactory.create(row.getString(0), row.getString(1), row.getMap(2)); 

    On Friday, January 27, 2017 10:52 AM, Ankur Srivastava <an...@gmail.com> wrote:
 

 + DEV Mailing List
On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava <an...@gmail.com> wrote:

Hi,
I am trying to map a Dataset with rows which have a map attribute. When I try to create a Row with the map attribute I get cast errors. I am able to reproduce the issue with the below sample code. The surprising thing is with same schema I am able to create a dataset from the List of rows.
I am on Spark 2.0 and scala 2.11public static void main(String[] args) {
    StructType schema = new StructType().add("src", DataTypes.StringType)
            .add("dst", DataTypes.StringType)
            .add("freq", DataTypes.createMapType( DataTypes.StringType, DataTypes.IntegerType));
    List<Row> inputData = new ArrayList<>();
    inputData.add(RowFactory.creat e("1", "2", new HashMap<>()));
    SparkSession sparkSession = SparkSession
            .builder()
            .appName("IPCountFilterTest")
            .master("local")
            .getOrCreate();

    Dataset<Row> out = sparkSession.createDataFrame( inputData, schema);
    out.show();

    Encoder<Row> rowEncoder = RowEncoder.apply(schema);
    out.map((MapFunction<Row, Row>) row -> {
        Row newRow = RowFactory.create(row. getString(0), row.getString(1), new HashMap<String, Integer>());       //Row newRow = RowFactory.create(row. getString(0), row.getString(1), row.getJavaMap(2));        return newRow;
    }, rowEncoder).show();
}
Below is the error:
17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map<string,int> at org.apache.spark.sql.catalyst. expressions.GeneratedClass$ GeneratedIterator.processNext( Unknown Source) at org.apache.spark.sql. execution.BufferedRowIterator. hasNext(BufferedRowIterator. java:43) at org.apache.spark.sql. execution. WholeStageCodegenExec$$ anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:246) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240) at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) at org.apache.spark.rdd.RDD. iterator(RDD.scala:283) at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) at org.apache.spark.scheduler. Task.run(Task.scala:85) at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread. java:745)17/01/26 17:05:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map<string,int> at org.apache.spark.sql.catalyst. expressions.GeneratedClass$ GeneratedIterator.processNext( Unknown Source) at org.apache.spark.sql. execution.BufferedRowIterator. hasNext(BufferedRowIterator. java:43) at org.apache.spark.sql. execution. WholeStageCodegenExec$$ anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:246) at org.apache.spark.sql. execution.SparkPlan$$anonfun$ 4.apply(SparkPlan.scala:240) at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$ 1$$anonfun$apply$24.apply(RDD. scala:784) at org.apache.spark.rdd. MapPartitionsRDD.compute( MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319) at org.apache.spark.rdd.RDD. iterator(RDD.scala:283) at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70) at org.apache.spark.scheduler. Task.run(Task.scala:85) at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread. java:745)

ThanksAnkur



   

Re: Issue creating row with java.util.Map type

Posted by Ankur Srivastava <an...@gmail.com>.
Thank you Richard for responding.

I am able to run it successfully by using row.getMap but since I have to
update the map I wanted to use the HashMap api. Is there a way I can use
that? And I am surprised it worked in first case where I am creating
Dataset from list of rows but fails in the Map function.

Thanks
Ankur

On Fri, Jan 27, 2017 at 12:15 PM, Richard Xin <ri...@yahoo.com>
wrote:

> try
> Row newRow = RowFactory.create(row.getString(0), row.getString(1), row.getMap(2));
>
>
>
> On Friday, January 27, 2017 10:52 AM, Ankur Srivastava <
> ankur.srivastava@gmail.com> wrote:
>
>
> + DEV Mailing List
>
> On Thu, Jan 26, 2017 at 5:12 PM, Ankur Srivastava <
> ankur.srivastava@gmail.com> wrote:
>
> Hi,
>
> I am trying to map a Dataset with rows which have a map attribute. When I
> try to create a Row with the map attribute I get cast errors. I am able to
> reproduce the issue with the below sample code. The surprising thing is
> with same schema I am able to create a dataset from the List of rows.
>
> I am on Spark 2.0 and scala 2.11
>
> public static void main(String[] args) {
>     StructType schema = new StructType().add("src", DataTypes.StringType)
>             .add("dst", DataTypes.StringType)
>             .add("freq", DataTypes.createMapType( DataTypes.StringType, DataTypes.IntegerType));
>     List<Row> inputData = new ArrayList<>();
>     inputData.add(RowFactory.creat e("1", "2", new HashMap<>()));
>     SparkSession sparkSession = SparkSession
>             .builder()
>             .appName("IPCountFilterTest")
>             .master("local")
>             .getOrCreate();
>
>     Dataset<Row> out = sparkSession.createDataFrame( inputData, schema);
>     out.show();
>
>     Encoder<Row> rowEncoder = RowEncoder.apply(schema);
>     out.map((MapFunction<Row, Row>) row -> {
>         Row newRow = RowFactory.create(row. getString(0), row.getString(1), new HashMap<String, Integer>());
>
>        //Row newRow = RowFactory.create(row. getString(0), row.getString(1), row.getJavaMap(2));
>
>         return newRow;
>     }, rowEncoder).show();
> }
>
> Below is the error:
>
> 17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
> java.lang.RuntimeException: java.util.HashMap is not a valid external type
> for schema of map<string,int>
> at org.apache.spark.sql.catalyst. expressions.GeneratedClass$
> GeneratedIterator.processNext( Unknown Source)
> at org.apache.spark.sql. execution.BufferedRowIterator.
> hasNext(BufferedRowIterator. java:43)
> at org.apache.spark.sql. execution. WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370)
> at org.apache.spark.sql. execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:246)
> at org.apache.spark.sql. execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD. scala:784)
> at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD. scala:784)
> at org.apache.spark.rdd. MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319)
> at org.apache.spark.rdd.RDD. iterator(RDD.scala:283)
> at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70)
> at org.apache.spark.scheduler. Task.run(Task.scala:85)
> at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274)
> at java.util.concurrent. ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent. ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread. java:745)
> 17/01/26 17:05:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.RuntimeException: java.util.HashMap is not a valid
> external type for schema of map<string,int>
> at org.apache.spark.sql.catalyst. expressions.GeneratedClass$
> GeneratedIterator.processNext( Unknown Source)
> at org.apache.spark.sql. execution.BufferedRowIterator.
> hasNext(BufferedRowIterator. java:43)
> at org.apache.spark.sql. execution. WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext( WholeStageCodegenExec.scala: 370)
> at org.apache.spark.sql. execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:246)
> at org.apache.spark.sql. execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD. scala:784)
> at org.apache.spark.rdd.RDD$$ anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD. scala:784)
> at org.apache.spark.rdd. MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD. computeOrReadCheckpoint(RDD. scala:319)
> at org.apache.spark.rdd.RDD. iterator(RDD.scala:283)
> at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:70)
> at org.apache.spark.scheduler. Task.run(Task.scala:85)
> at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274)
> at java.util.concurrent. ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent. ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread. java:745)
>
>
> Thanks
> Ankur
>
>
>
>
>