You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bjørn Jørgensen (Jira)" <ji...@apache.org> on 2022/12/01 19:26:00 UTC

[jira] [Updated] (SPARK-36950) Normalize semi-structured data into tabular tables.

     [ https://issues.apache.org/jira/browse/SPARK-36950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bjørn Jørgensen updated SPARK-36950:
------------------------------------
    Description: 
Many users get seminested data form JSON or XML. 
There are some problems with querying this data, where there are nested fields.
In pandas there is a [json_normalize|https://github.com/pandas-dev/pandas/blob/v1.3.3/pandas/io/json/_normalize.py#L112-L353] function that flat out nested dicts. 

Here are some examples for the use of those [Flatten Complex Nested JSON (PYSPARK)|https://stackoverflow.com/questions/73599398/flatten-complex-nested-json-pyspark/73666330#73666330]
[Unable to load jsonl nested file into a flattened dataframe|https://stackoverflow.com/questions/73546452/unable-to-load-jsonl-nested-file-into-a-flattened-dataframe/73594355#73594355]

With pandas users can use this function

 
{code:java}
def flatten_pandas(df_):
    #The same as flatten but for pandas
    have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
    have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
    have_nested = len(have_list) + len(have_dict)
    
    while have_nested!=0:
        if len(have_list)!=0:
            for _ in have_list:
                df_ = df_.explode(_)
                
        elif have_dict !=0:
            df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
        
        have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
        have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
        have_nested = len(have_list) + len(have_dict)
        
    return df_
{code}
 

With pyspark or pandas_api we don't have a function for getting dict to columns implemented. 
These are the functions that I'm using to do the same in pyspark.
{code:java}
from pyspark.sql.functions import *
from pyspark.sql.types import *


def flatten_test(df, sep="_"):
    """Returns a flattened dataframe.
    .. versionadded:: x.X.X

    Parameters
    ----------
    sep : str
        Delimiter for flatted columns. Default `_`

    Notes
    -----
    Don`t use `.` as `sep`
    It won't work on nested data frames with more than one level.
    And you will have to use `columns.name`.

    Flattening Map Types will have to find every key in the column.
    This can be slow.

    Examples
    --------

    data_mixed = [
        {
            "state": "Florida",
            "shortname": "FL",
            "info": {"governor": "Rick Scott"},
            "counties": [
                {"name": "Dade", "population": 12345},
                {"name": "Broward", "population": 40000},
                {"name": "Palm Beach", "population": 60000},
            ],
        },
        {
            "state": "Ohio",
            "shortname": "OH",
            "info": {"governor": "John Kasich"},
            "counties": [
                {"name": "Summit", "population": 1234},
                {"name": "Cuyahoga", "population": 1337},
            ],
        },
    ]

    data_mixed = spark.createDataFrame(data=data_mixed)

    data_mixed.printSchema()

    root
    |-- counties: array (nullable = true)
    |    |-- element: map (containsNull = true)
    |    |    |-- key: string
    |    |    |-- value: string (valueContainsNull = true)
    |-- info: map (nullable = true)
    |    |-- key: string
    |    |-- value: string (valueContainsNull = true)
    |-- shortname: string (nullable = true)
    |-- state: string (nullable = true)


    data_mixed_flat = flatten_test(df, sep=":")
    data_mixed_flat.printSchema()
    root
    |-- shortname: string (nullable = true)
    |-- state: string (nullable = true)
    |-- counties:name: string (nullable = true)
    |-- counties:population: string (nullable = true)
    |-- info:governor: string (nullable = true)




    data = [
        {
            "id": 1,
            "name": "Cole Volk",
            "fitness": {"height": 130, "weight": 60},
        },
        {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
        {
            "id": 2,
            "name": "Faye Raker",
            "fitness": {"height": 130, "weight": 60},
        },
    ]


    df = spark.createDataFrame(data=data)

    df.printSchema()

    root
    |-- fitness: map (nullable = true)
    |    |-- key: string
    |    |-- value: long (valueContainsNull = true)
    |-- id: long (nullable = true)
    |-- name: string (nullable = true)

    df_flat = flatten_test(df, sep=":")

    df_flat.printSchema()

    root
    |-- id: long (nullable = true)
    |-- name: string (nullable = true)
    |-- fitness:height: long (nullable = true)
    |-- fitness:weight: long (nullable = true)

    data_struct = [
            (("James",None,"Smith"),"OH","M"),
            (("Anna","Rose",""),"NY","F"),
            (("Julia","","Williams"),"OH","F"),
            (("Maria","Anne","Jones"),"NY","M"),
            (("Jen","Mary","Brown"),"NY","M"),
            (("Mike","Mary","Williams"),"OH","M")
            ]


    schema = StructType([
        StructField('name', StructType([
            StructField('firstname', StringType(), True),
            StructField('middlename', StringType(), True),
            StructField('lastname', StringType(), True)
            ])),
        StructField('state', StringType(), True),
        StructField('gender', StringType(), True)
        ])

    df_struct = spark.createDataFrame(data = data_struct, schema = schema)

    df_struct.printSchema()

    root
    |-- name: struct (nullable = true)
    |    |-- firstname: string (nullable = true)
    |    |-- middlename: string (nullable = true)
    |    |-- lastname: string (nullable = true)
    |-- state: string (nullable = true)
    |-- gender: string (nullable = true)

    df_struct_flat = flatten_test(df_struct, sep=":")

    df_struct_flat.printSchema()

    root
    |-- state: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- name:firstname: string (nullable = true)
    |-- name:middlename: string (nullable = true)
    |-- name:lastname: string (nullable = true)
    """
    # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
    complex_fields = dict(
        [
            (field.name, field.dataType)
            for field in df.schema.fields
            if type(field.dataType) == ArrayType
            or type(field.dataType) == StructType
            or type(field.dataType) == MapType
        ]
    )

    while len(complex_fields) != 0:
        col_name = list(complex_fields.keys())[0]
        # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))

        # if StructType then convert all sub element to columns.
        # i.e. flatten structs
        if type(complex_fields[col_name]) == StructType:
            expanded = [
                col(col_name + "." + k).alias(col_name + sep + k)
                for k in [n.name for n in complex_fields[col_name]]
            ]
            df = df.select("*", *expanded).drop(col_name)

        # if ArrayType then add the Array Elements as Rows using the explode function
        # i.e. explode Arrays
        elif type(complex_fields[col_name]) == ArrayType:
            df = df.withColumn(col_name, explode_outer(col_name))

        # if MapType then convert all sub element to columns.
        # i.e. flatten
        elif type(complex_fields[col_name]) == MapType:
            keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
            keys = list(map(lambda row: row[0], keys_df.collect()))
            key_cols = list(
                map(
                    lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
                    keys,
                )
            )
            drop_column_list = [col_name]
            df = df.select(
                [
                    col_name
                    for col_name in df.columns
                    if col_name not in drop_column_list
                ]
                + key_cols
            )

        # recompute remaining Complex Fields in Schema
        complex_fields = dict(
            [
                (field.name, field.dataType)
                for field in df.schema.fields
                if type(field.dataType) == ArrayType
                or type(field.dataType) == StructType
                or type(field.dataType) == MapType
            ]
        )

    return df
{code}
{code:java}
# We take a dataframe and return a new one with required changes
def cleanDataFrame(df: DataFrame) -> DataFrame:
    # Returns a new sanitized field name (this function can be anything really)
    def sanitizeFieldName(s: str) -> str:
        return (
            s.replace("-", "_")
            .replace("&", "_")
            .replace('"', "_")
            .replace("[", "_")
            .replace("]", "_")
            .replace(".", "_")
        )

    # We call this on all fields to create a copy and to perform any changes we might
    # want to do to the field.
    def sanitizeField(field: StructField) -> StructField:
        field = copy(field)
        field.name = sanitizeFieldName(field.name)
        # We recursively call cleanSchema on all types
        field.dataType = cleanSchema(field.dataType)
        return field

    def cleanSchema(dataType: [DataType]) -> [DateType]:
        dataType = copy(dataType)
        # If the type is a StructType we need to recurse otherwise we can return since
        # we've reached the leaf node
        if isinstance(dataType, StructType):
            # We call our sanitizer for all top level fields
            dataType.fields = [sanitizeField(f) for f in dataType.fields]
        elif isinstance(dataType, ArrayType):
            dataType.elementType = cleanSchema(dataType.elementType)
        return dataType

    # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
    return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
{code}
{code:java}
def json_to_norm_with_null(dir_path, path_to_save):
    path = dir_path

    for filename in os.listdir(path):
        if not filename.endswith(".json"):
            continue

        fullname = os.path.join(path, filename)
        with open(fullname) as json_file:
            jsonstr = json.load(json_file)

        df = spark.read.json(fullname)
        df = cleanDataFrame(df)
        df = flatten_test(df, sep=":")
        df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
{code}
def cleanDataFrame is taken from a post at stackoverflow 
def flatten_test the first 2 parts for array and struct are taken from [gnist github|https://gist.github.com/nmukerje/e65cde41be85470e4b8dfd9a2d6aed50] 

The last part with map are inspired from https://mungingdata.com/pyspark/dict-map-to-multiple-columns/ 
The exemples in def flatten_test are taken from pandas json_normalize function.
There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.

  was:
Many users get seminested data form JSON or XML. 
There are some problems with querying this data, where there are nested fields.
In pandas there is a [json_normalize|https://github.com/pandas-dev/pandas/blob/v1.3.3/pandas/io/json/_normalize.py#L112-L353] function that flat out nested dicts. 

Here are some examples for the use of those [Flatten Complex Nested JSON (PYSPARK)|https://stackoverflow.com/questions/73599398/flatten-complex-nested-json-pyspark/73666330#73666330]
[Unable to load jsonl nested file into a flattened dataframe|https://stackoverflow.com/questions/73546452/unable-to-load-jsonl-nested-file-into-a-flattened-dataframe/73594355#73594355]

With pandas users can use this function

 
{code:java}
def flatten_pandas(df_):
    #The same as flatten but for pandas
    have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
    have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
    have_nested = len(have_list) + len(have_dict)
    
    while have_nested!=0:
        if len(have_list)!=0:
            for _ in have_list:
                df_ = df_.explode(_)
                
        elif have_dict !=0:
            df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
        
        have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
        have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
        have_nested = len(have_list) + len(have_dict)
        
    return df_
{code}
 

With pyspark or pandas_api we don't have a function for getting dict to columns implemented. 
These are the functions that I'm using to do the same in pyspark.
{code:java}
from pyspark.sql.functions import *
from pyspark.sql.types import *


def flatten_test(df, sep="_"):
    """Returns a flattened dataframe.
    .. versionadded:: x.X.X

    Parameters
    ----------
    sep : str
        Delimiter for flatted columns. Default `_`

    Notes
    -----
    Don`t use `.` as `sep`
    It won't work on nested data frames with more than one level.
    And you will have to use `columns.name`.

    Flattening Map Types will have to find every key in the column.
    This can be slow.

    Examples
    --------

    data_mixed = [
        {
            "state": "Florida",
            "shortname": "FL",
            "info": {"governor": "Rick Scott"},
            "counties": [
                {"name": "Dade", "population": 12345},
                {"name": "Broward", "population": 40000},
                {"name": "Palm Beach", "population": 60000},
            ],
        },
        {
            "state": "Ohio",
            "shortname": "OH",
            "info": {"governor": "John Kasich"},
            "counties": [
                {"name": "Summit", "population": 1234},
                {"name": "Cuyahoga", "population": 1337},
            ],
        },
    ]

    data_mixed = spark.createDataFrame(data=data_mixed)

    data_mixed.printSchema()

    root
    |-- counties: array (nullable = true)
    |    |-- element: map (containsNull = true)
    |    |    |-- key: string
    |    |    |-- value: string (valueContainsNull = true)
    |-- info: map (nullable = true)
    |    |-- key: string
    |    |-- value: string (valueContainsNull = true)
    |-- shortname: string (nullable = true)
    |-- state: string (nullable = true)


    data_mixed_flat = flatten_test(df, sep=":")
    data_mixed_flat.printSchema()
    root
    |-- shortname: string (nullable = true)
    |-- state: string (nullable = true)
    |-- counties:name: string (nullable = true)
    |-- counties:population: string (nullable = true)
    |-- info:governor: string (nullable = true)




    data = [
        {
            "id": 1,
            "name": "Cole Volk",
            "fitness": {"height": 130, "weight": 60},
        },
        {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
        {
            "id": 2,
            "name": "Faye Raker",
            "fitness": {"height": 130, "weight": 60},
        },
    ]


    df = spark.createDataFrame(data=data)

    df.printSchema()

    root
    |-- fitness: map (nullable = true)
    |    |-- key: string
    |    |-- value: long (valueContainsNull = true)
    |-- id: long (nullable = true)
    |-- name: string (nullable = true)

    df_flat = flatten_test(df, sep=":")

    df_flat.printSchema()

    root
    |-- id: long (nullable = true)
    |-- name: string (nullable = true)
    |-- fitness:height: long (nullable = true)
    |-- fitness:weight: long (nullable = true)

    data_struct = [
            (("James",None,"Smith"),"OH","M"),
            (("Anna","Rose",""),"NY","F"),
            (("Julia","","Williams"),"OH","F"),
            (("Maria","Anne","Jones"),"NY","M"),
            (("Jen","Mary","Brown"),"NY","M"),
            (("Mike","Mary","Williams"),"OH","M")
            ]


    schema = StructType([
        StructField('name', StructType([
            StructField('firstname', StringType(), True),
            StructField('middlename', StringType(), True),
            StructField('lastname', StringType(), True)
            ])),
        StructField('state', StringType(), True),
        StructField('gender', StringType(), True)
        ])

    df_struct = spark.createDataFrame(data = data_struct, schema = schema)

    df_struct.printSchema()

    root
    |-- name: struct (nullable = true)
    |    |-- firstname: string (nullable = true)
    |    |-- middlename: string (nullable = true)
    |    |-- lastname: string (nullable = true)
    |-- state: string (nullable = true)
    |-- gender: string (nullable = true)

    df_struct_flat = flatten_test(df_struct, sep=":")

    df_struct_flat.printSchema()

    root
    |-- state: string (nullable = true)
    |-- gender: string (nullable = true)
    |-- name:firstname: string (nullable = true)
    |-- name:middlename: string (nullable = true)
    |-- name:lastname: string (nullable = true)
    """
    # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
    complex_fields = dict(
        [
            (field.name, field.dataType)
            for field in df.schema.fields
            if type(field.dataType) == ArrayType
            or type(field.dataType) == StructType
            or type(field.dataType) == MapType
        ]
    )

    while len(complex_fields) != 0:
        col_name = list(complex_fields.keys())[0]
        # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))

        # if StructType then convert all sub element to columns.
        # i.e. flatten structs
        if type(complex_fields[col_name]) == StructType:
            expanded = [
                col(col_name + "." + k).alias(col_name + sep + k)
                for k in [n.name for n in complex_fields[col_name]]
            ]
            df = df.select("*", *expanded).drop(col_name)

        # if ArrayType then add the Array Elements as Rows using the explode function
        # i.e. explode Arrays
        elif type(complex_fields[col_name]) == ArrayType:
            df = df.withColumn(col_name, explode_outer(col_name))

        # if MapType then convert all sub element to columns.
        # i.e. flatten
        elif type(complex_fields[col_name]) == MapType:
            keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
            keys = list(map(lambda row: row[0], keys_df.collect()))
            key_cols = list(
                map(
                    lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
                    keys,
                )
            )
            drop_column_list = [col_name]
            df = df.select(
                [
                    col_name
                    for col_name in df.columns
                    if col_name not in drop_column_list
                ]
                + key_cols
            )

        # recompute remaining Complex Fields in Schema
        complex_fields = dict(
            [
                (field.name, field.dataType)
                for field in df.schema.fields
                if type(field.dataType) == ArrayType
                or type(field.dataType) == StructType
                or type(field.dataType) == MapType
            ]
        )

    return df
{code}
{code:java}
# We take a dataframe and return a new one with required changes
def cleanDataFrame(df: DataFrame) -> DataFrame:
    # Returns a new sanitized field name (this function can be anything really)
    def sanitizeFieldName(s: str) -> str:
        return (
            s.replace("-", "_")
            .replace("&", "_")
            .replace('"', "_")
            .replace("[", "_")
            .replace("]", "_")
            .replace(".", "_")
        )

    # We call this on all fields to create a copy and to perform any changes we might
    # want to do to the field.
    def sanitizeField(field: StructField) -> StructField:
        field = copy(field)
        field.name = sanitizeFieldName(field.name)
        # We recursively call cleanSchema on all types
        field.dataType = cleanSchema(field.dataType)
        return field

    def cleanSchema(dataType: [DataType]) -> [DateType]:
        dataType = copy(dataType)
        # If the type is a StructType we need to recurse otherwise we can return since
        # we've reached the leaf node
        if isinstance(dataType, StructType):
            # We call our sanitizer for all top level fields
            dataType.fields = [sanitizeField(f) for f in dataType.fields]
        elif isinstance(dataType, ArrayType):
            dataType.elementType = cleanSchema(dataType.elementType)
        return dataType

    # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
    return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
{code}
{code:java}
def json_to_norm_with_null(dir_path, path_to_save):
    path = dir_path

    for filename in os.listdir(path):
        if not filename.endswith(".json"):
            continue

        fullname = os.path.join(path, filename)
        with open(fullname) as json_file:
            jsonstr = json.load(json_file)

        df = spark.read.json(fullname)
        df = cleanDataFrame(df)
        df = flatten_test(df, sep=":")
        df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
{code}
def cleanDataFrame is taken from a post at stackoverflow 
def flatten_test the first 2 parts for array and struct are taken from [gnist github|https://gist.github.com/nmukerje/e65cde41be85470e4b8dfd9a2d6aed50] 
The exemples in def flatten_test are taken from pandas json_normalize function.
There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.


> Normalize semi-structured data into tabular tables.
> ---------------------------------------------------
>
>                 Key: SPARK-36950
>                 URL: https://issues.apache.org/jira/browse/SPARK-36950
>             Project: Spark
>          Issue Type: Improvement
>          Components: Pandas API on Spark, PySpark
>    Affects Versions: 3.4.0
>            Reporter: Bjørn Jørgensen
>            Priority: Major
>
> Many users get seminested data form JSON or XML. 
> There are some problems with querying this data, where there are nested fields.
> In pandas there is a [json_normalize|https://github.com/pandas-dev/pandas/blob/v1.3.3/pandas/io/json/_normalize.py#L112-L353] function that flat out nested dicts. 
> Here are some examples for the use of those [Flatten Complex Nested JSON (PYSPARK)|https://stackoverflow.com/questions/73599398/flatten-complex-nested-json-pyspark/73666330#73666330]
> [Unable to load jsonl nested file into a flattened dataframe|https://stackoverflow.com/questions/73546452/unable-to-load-jsonl-nested-file-into-a-flattened-dataframe/73594355#73594355]
> With pandas users can use this function
>  
> {code:java}
> def flatten_pandas(df_):
>     #The same as flatten but for pandas
>     have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
>     have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
>     have_nested = len(have_list) + len(have_dict)
>     
>     while have_nested!=0:
>         if len(have_list)!=0:
>             for _ in have_list:
>                 df_ = df_.explode(_)
>                 
>         elif have_dict !=0:
>             df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
>         
>         have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
>         have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
>         have_nested = len(have_list) + len(have_dict)
>         
>     return df_
> {code}
>  
> With pyspark or pandas_api we don't have a function for getting dict to columns implemented. 
> These are the functions that I'm using to do the same in pyspark.
> {code:java}
> from pyspark.sql.functions import *
> from pyspark.sql.types import *
> def flatten_test(df, sep="_"):
>     """Returns a flattened dataframe.
>     .. versionadded:: x.X.X
>     Parameters
>     ----------
>     sep : str
>         Delimiter for flatted columns. Default `_`
>     Notes
>     -----
>     Don`t use `.` as `sep`
>     It won't work on nested data frames with more than one level.
>     And you will have to use `columns.name`.
>     Flattening Map Types will have to find every key in the column.
>     This can be slow.
>     Examples
>     --------
>     data_mixed = [
>         {
>             "state": "Florida",
>             "shortname": "FL",
>             "info": {"governor": "Rick Scott"},
>             "counties": [
>                 {"name": "Dade", "population": 12345},
>                 {"name": "Broward", "population": 40000},
>                 {"name": "Palm Beach", "population": 60000},
>             ],
>         },
>         {
>             "state": "Ohio",
>             "shortname": "OH",
>             "info": {"governor": "John Kasich"},
>             "counties": [
>                 {"name": "Summit", "population": 1234},
>                 {"name": "Cuyahoga", "population": 1337},
>             ],
>         },
>     ]
>     data_mixed = spark.createDataFrame(data=data_mixed)
>     data_mixed.printSchema()
>     root
>     |-- counties: array (nullable = true)
>     |    |-- element: map (containsNull = true)
>     |    |    |-- key: string
>     |    |    |-- value: string (valueContainsNull = true)
>     |-- info: map (nullable = true)
>     |    |-- key: string
>     |    |-- value: string (valueContainsNull = true)
>     |-- shortname: string (nullable = true)
>     |-- state: string (nullable = true)
>     data_mixed_flat = flatten_test(df, sep=":")
>     data_mixed_flat.printSchema()
>     root
>     |-- shortname: string (nullable = true)
>     |-- state: string (nullable = true)
>     |-- counties:name: string (nullable = true)
>     |-- counties:population: string (nullable = true)
>     |-- info:governor: string (nullable = true)
>     data = [
>         {
>             "id": 1,
>             "name": "Cole Volk",
>             "fitness": {"height": 130, "weight": 60},
>         },
>         {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
>         {
>             "id": 2,
>             "name": "Faye Raker",
>             "fitness": {"height": 130, "weight": 60},
>         },
>     ]
>     df = spark.createDataFrame(data=data)
>     df.printSchema()
>     root
>     |-- fitness: map (nullable = true)
>     |    |-- key: string
>     |    |-- value: long (valueContainsNull = true)
>     |-- id: long (nullable = true)
>     |-- name: string (nullable = true)
>     df_flat = flatten_test(df, sep=":")
>     df_flat.printSchema()
>     root
>     |-- id: long (nullable = true)
>     |-- name: string (nullable = true)
>     |-- fitness:height: long (nullable = true)
>     |-- fitness:weight: long (nullable = true)
>     data_struct = [
>             (("James",None,"Smith"),"OH","M"),
>             (("Anna","Rose",""),"NY","F"),
>             (("Julia","","Williams"),"OH","F"),
>             (("Maria","Anne","Jones"),"NY","M"),
>             (("Jen","Mary","Brown"),"NY","M"),
>             (("Mike","Mary","Williams"),"OH","M")
>             ]
>     schema = StructType([
>         StructField('name', StructType([
>             StructField('firstname', StringType(), True),
>             StructField('middlename', StringType(), True),
>             StructField('lastname', StringType(), True)
>             ])),
>         StructField('state', StringType(), True),
>         StructField('gender', StringType(), True)
>         ])
>     df_struct = spark.createDataFrame(data = data_struct, schema = schema)
>     df_struct.printSchema()
>     root
>     |-- name: struct (nullable = true)
>     |    |-- firstname: string (nullable = true)
>     |    |-- middlename: string (nullable = true)
>     |    |-- lastname: string (nullable = true)
>     |-- state: string (nullable = true)
>     |-- gender: string (nullable = true)
>     df_struct_flat = flatten_test(df_struct, sep=":")
>     df_struct_flat.printSchema()
>     root
>     |-- state: string (nullable = true)
>     |-- gender: string (nullable = true)
>     |-- name:firstname: string (nullable = true)
>     |-- name:middlename: string (nullable = true)
>     |-- name:lastname: string (nullable = true)
>     """
>     # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
>     complex_fields = dict(
>         [
>             (field.name, field.dataType)
>             for field in df.schema.fields
>             if type(field.dataType) == ArrayType
>             or type(field.dataType) == StructType
>             or type(field.dataType) == MapType
>         ]
>     )
>     while len(complex_fields) != 0:
>         col_name = list(complex_fields.keys())[0]
>         # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
>         # if StructType then convert all sub element to columns.
>         # i.e. flatten structs
>         if type(complex_fields[col_name]) == StructType:
>             expanded = [
>                 col(col_name + "." + k).alias(col_name + sep + k)
>                 for k in [n.name for n in complex_fields[col_name]]
>             ]
>             df = df.select("*", *expanded).drop(col_name)
>         # if ArrayType then add the Array Elements as Rows using the explode function
>         # i.e. explode Arrays
>         elif type(complex_fields[col_name]) == ArrayType:
>             df = df.withColumn(col_name, explode_outer(col_name))
>         # if MapType then convert all sub element to columns.
>         # i.e. flatten
>         elif type(complex_fields[col_name]) == MapType:
>             keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
>             keys = list(map(lambda row: row[0], keys_df.collect()))
>             key_cols = list(
>                 map(
>                     lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
>                     keys,
>                 )
>             )
>             drop_column_list = [col_name]
>             df = df.select(
>                 [
>                     col_name
>                     for col_name in df.columns
>                     if col_name not in drop_column_list
>                 ]
>                 + key_cols
>             )
>         # recompute remaining Complex Fields in Schema
>         complex_fields = dict(
>             [
>                 (field.name, field.dataType)
>                 for field in df.schema.fields
>                 if type(field.dataType) == ArrayType
>                 or type(field.dataType) == StructType
>                 or type(field.dataType) == MapType
>             ]
>         )
>     return df
> {code}
> {code:java}
> # We take a dataframe and return a new one with required changes
> def cleanDataFrame(df: DataFrame) -> DataFrame:
>     # Returns a new sanitized field name (this function can be anything really)
>     def sanitizeFieldName(s: str) -> str:
>         return (
>             s.replace("-", "_")
>             .replace("&", "_")
>             .replace('"', "_")
>             .replace("[", "_")
>             .replace("]", "_")
>             .replace(".", "_")
>         )
>     # We call this on all fields to create a copy and to perform any changes we might
>     # want to do to the field.
>     def sanitizeField(field: StructField) -> StructField:
>         field = copy(field)
>         field.name = sanitizeFieldName(field.name)
>         # We recursively call cleanSchema on all types
>         field.dataType = cleanSchema(field.dataType)
>         return field
>     def cleanSchema(dataType: [DataType]) -> [DateType]:
>         dataType = copy(dataType)
>         # If the type is a StructType we need to recurse otherwise we can return since
>         # we've reached the leaf node
>         if isinstance(dataType, StructType):
>             # We call our sanitizer for all top level fields
>             dataType.fields = [sanitizeField(f) for f in dataType.fields]
>         elif isinstance(dataType, ArrayType):
>             dataType.elementType = cleanSchema(dataType.elementType)
>         return dataType
>     # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
>     return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
> {code}
> {code:java}
> def json_to_norm_with_null(dir_path, path_to_save):
>     path = dir_path
>     for filename in os.listdir(path):
>         if not filename.endswith(".json"):
>             continue
>         fullname = os.path.join(path, filename)
>         with open(fullname) as json_file:
>             jsonstr = json.load(json_file)
>         df = spark.read.json(fullname)
>         df = cleanDataFrame(df)
>         df = flatten_test(df, sep=":")
>         df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
> {code}
> def cleanDataFrame is taken from a post at stackoverflow 
> def flatten_test the first 2 parts for array and struct are taken from [gnist github|https://gist.github.com/nmukerje/e65cde41be85470e4b8dfd9a2d6aed50] 
> The last part with map are inspired from https://mungingdata.com/pyspark/dict-map-to-multiple-columns/ 
> The exemples in def flatten_test are taken from pandas json_normalize function.
> There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org