You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "hejsgpuom62c (JIRA)" <ji...@apache.org> on 2019/03/04 21:42:00 UTC

[jira] [Created] (SPARK-27050) Bean Encoder serializes data in a wrong order if input schema is not ordered

hejsgpuom62c created SPARK-27050:
------------------------------------

             Summary: Bean Encoder serializes data in a wrong order if input schema is not ordered
                 Key: SPARK-27050
                 URL: https://issues.apache.org/jira/browse/SPARK-27050
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.4.0
            Reporter: hejsgpuom62c


Steps to reproduce. Define schema like this

 
{code:java}

StructType valid = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<timestamp: timestamp, storage: double>>" 
);{code}
{code:java}
package com.example;

import java.io.Serializable;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
    private String broker_name;
    private String server_name;
    private Integer order;
    private Storage[] storages;
}{code}
{code:java}
package com.example;

import java.io.Serializable;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
    private java.sql.Timestamp timestamp;
    private Double storage;
}{code}


Create a JSON file with the following content:


{code:java}
[
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-29 23:11:44.000",
        "storage": 12.5
      }
    ]
  }
]{code}
 

Process data as


{code:java}
Dataset<Entity> ds = spark.read().option("multiline", "true").schema(valid).json("/path/to/file")
        .as(Encoders.bean(Entity.class));

    ds
        .groupByKey((MapFunction<Entity, String>) o -> o.getBroker_name(), Encoders.STRING())
        .reduceGroups((ReduceFunction<Entity>)(e1, e2) -> e1)
        .map((MapFunction<Tuple2<String, Entity>, Entity>) tuple -> tuple._2, Encoders.bean(Entity.class))
        .show(10, false);{code}

The result will be:
{code:java}
+-----------+-----+-----------+--------------------------------------------------------+
|broker_name|order|server_name|storages                                                |
+-----------+-----+-----------+--------------------------------------------------------+
|A1         |1    |S1         |[[7.612815958429577E-309, 148474-03-19 22:14:3232.5248]]|
+-----------+-----+-----------+--------------------------------------------------------+
{code}

Source https://stackoverflow.com/q/54987724



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org