You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bjørn Jørgensen (Jira)" <ji...@apache.org> on 2022/12/01 19:26:00 UTC
[jira] [Updated] (SPARK-36950) Normalize semi-structured data into tabular tables.
[ https://issues.apache.org/jira/browse/SPARK-36950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bjørn Jørgensen updated SPARK-36950:
------------------------------------
Description:
Many users get seminested data form JSON or XML.
There are some problems with querying this data, where there are nested fields.
In pandas there is a [json_normalize|https://github.com/pandas-dev/pandas/blob/v1.3.3/pandas/io/json/_normalize.py#L112-L353] function that flat out nested dicts.
Here are some examples for the use of those [Flatten Complex Nested JSON (PYSPARK)|https://stackoverflow.com/questions/73599398/flatten-complex-nested-json-pyspark/73666330#73666330]
[Unable to load jsonl nested file into a flattened dataframe|https://stackoverflow.com/questions/73546452/unable-to-load-jsonl-nested-file-into-a-flattened-dataframe/73594355#73594355]
With pandas users can use this function
{code:java}
def flatten_pandas(df_):
#The same as flatten but for pandas
have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
have_nested = len(have_list) + len(have_dict)
while have_nested!=0:
if len(have_list)!=0:
for _ in have_list:
df_ = df_.explode(_)
elif have_dict !=0:
df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
have_nested = len(have_list) + len(have_dict)
return df_
{code}
With pyspark or pandas_api we don't have a function for getting dict to columns implemented.
These are the functions that I'm using to do the same in pyspark.
{code:java}
from pyspark.sql.functions import *
from pyspark.sql.types import *
def flatten_test(df, sep="_"):
"""Returns a flattened dataframe.
.. versionadded:: x.X.X
Parameters
----------
sep : str
Delimiter for flatted columns. Default `_`
Notes
-----
Don`t use `.` as `sep`
It won't work on nested data frames with more than one level.
And you will have to use `columns.name`.
Flattening Map Types will have to find every key in the column.
This can be slow.
Examples
--------
data_mixed = [
{
"state": "Florida",
"shortname": "FL",
"info": {"governor": "Rick Scott"},
"counties": [
{"name": "Dade", "population": 12345},
{"name": "Broward", "population": 40000},
{"name": "Palm Beach", "population": 60000},
],
},
{
"state": "Ohio",
"shortname": "OH",
"info": {"governor": "John Kasich"},
"counties": [
{"name": "Summit", "population": 1234},
{"name": "Cuyahoga", "population": 1337},
],
},
]
data_mixed = spark.createDataFrame(data=data_mixed)
data_mixed.printSchema()
root
|-- counties: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- info: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
data_mixed_flat = flatten_test(df, sep=":")
data_mixed_flat.printSchema()
root
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
|-- counties:name: string (nullable = true)
|-- counties:population: string (nullable = true)
|-- info:governor: string (nullable = true)
data = [
{
"id": 1,
"name": "Cole Volk",
"fitness": {"height": 130, "weight": 60},
},
{"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
{
"id": 2,
"name": "Faye Raker",
"fitness": {"height": 130, "weight": 60},
},
]
df = spark.createDataFrame(data=data)
df.printSchema()
root
|-- fitness: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
df_flat = flatten_test(df, sep=":")
df_flat.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- fitness:height: long (nullable = true)
|-- fitness:weight: long (nullable = true)
data_struct = [
(("James",None,"Smith"),"OH","M"),
(("Anna","Rose",""),"NY","F"),
(("Julia","","Williams"),"OH","F"),
(("Maria","Anne","Jones"),"NY","M"),
(("Jen","Mary","Brown"),"NY","M"),
(("Mike","Mary","Williams"),"OH","M")
]
schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])
df_struct = spark.createDataFrame(data = data_struct, schema = schema)
df_struct.printSchema()
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
df_struct_flat = flatten_test(df_struct, sep=":")
df_struct_flat.printSchema()
root
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
|-- name:firstname: string (nullable = true)
|-- name:middlename: string (nullable = true)
|-- name:lastname: string (nullable = true)
"""
# compute Complex Fields (Arrays, Structs and Maptypes) in Schema
complex_fields = dict(
[
(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType
]
)
while len(complex_fields) != 0:
col_name = list(complex_fields.keys())[0]
# print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if type(complex_fields[col_name]) == StructType:
expanded = [
col(col_name + "." + k).alias(col_name + sep + k)
for k in [n.name for n in complex_fields[col_name]]
]
df = df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif type(complex_fields[col_name]) == ArrayType:
df = df.withColumn(col_name, explode_outer(col_name))
# if MapType then convert all sub element to columns.
# i.e. flatten
elif type(complex_fields[col_name]) == MapType:
keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(
map(
lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
keys,
)
)
drop_column_list = [col_name]
df = df.select(
[
col_name
for col_name in df.columns
if col_name not in drop_column_list
]
+ key_cols
)
# recompute remaining Complex Fields in Schema
complex_fields = dict(
[
(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType
]
)
return df
{code}
{code:java}
# We take a dataframe and return a new one with required changes
def cleanDataFrame(df: DataFrame) -> DataFrame:
# Returns a new sanitized field name (this function can be anything really)
def sanitizeFieldName(s: str) -> str:
return (
s.replace("-", "_")
.replace("&", "_")
.replace('"', "_")
.replace("[", "_")
.replace("]", "_")
.replace(".", "_")
)
# We call this on all fields to create a copy and to perform any changes we might
# want to do to the field.
def sanitizeField(field: StructField) -> StructField:
field = copy(field)
field.name = sanitizeFieldName(field.name)
# We recursively call cleanSchema on all types
field.dataType = cleanSchema(field.dataType)
return field
def cleanSchema(dataType: [DataType]) -> [DateType]:
dataType = copy(dataType)
# If the type is a StructType we need to recurse otherwise we can return since
# we've reached the leaf node
if isinstance(dataType, StructType):
# We call our sanitizer for all top level fields
dataType.fields = [sanitizeField(f) for f in dataType.fields]
elif isinstance(dataType, ArrayType):
dataType.elementType = cleanSchema(dataType.elementType)
return dataType
# Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
{code}
{code:java}
def json_to_norm_with_null(dir_path, path_to_save):
path = dir_path
for filename in os.listdir(path):
if not filename.endswith(".json"):
continue
fullname = os.path.join(path, filename)
with open(fullname) as json_file:
jsonstr = json.load(json_file)
df = spark.read.json(fullname)
df = cleanDataFrame(df)
df = flatten_test(df, sep=":")
df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
{code}
def cleanDataFrame is taken from a post at stackoverflow
def flatten_test the first 2 parts for array and struct are taken from [gnist github|https://gist.github.com/nmukerje/e65cde41be85470e4b8dfd9a2d6aed50]
The last part with map are inspired from https://mungingdata.com/pyspark/dict-map-to-multiple-columns/
The exemples in def flatten_test are taken from pandas json_normalize function.
There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.
was:
Many users get seminested data form JSON or XML.
There are some problems with querying this data, where there are nested fields.
In pandas there is a [json_normalize|https://github.com/pandas-dev/pandas/blob/v1.3.3/pandas/io/json/_normalize.py#L112-L353] function that flat out nested dicts.
Here are some examples for the use of those [Flatten Complex Nested JSON (PYSPARK)|https://stackoverflow.com/questions/73599398/flatten-complex-nested-json-pyspark/73666330#73666330]
[Unable to load jsonl nested file into a flattened dataframe|https://stackoverflow.com/questions/73546452/unable-to-load-jsonl-nested-file-into-a-flattened-dataframe/73594355#73594355]
With pandas users can use this function
{code:java}
def flatten_pandas(df_):
#The same as flatten but for pandas
have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
have_nested = len(have_list) + len(have_dict)
while have_nested!=0:
if len(have_list)!=0:
for _ in have_list:
df_ = df_.explode(_)
elif have_dict !=0:
df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
have_nested = len(have_list) + len(have_dict)
return df_
{code}
With pyspark or pandas_api we don't have a function for getting dict to columns implemented.
These are the functions that I'm using to do the same in pyspark.
{code:java}
from pyspark.sql.functions import *
from pyspark.sql.types import *
def flatten_test(df, sep="_"):
"""Returns a flattened dataframe.
.. versionadded:: x.X.X
Parameters
----------
sep : str
Delimiter for flatted columns. Default `_`
Notes
-----
Don`t use `.` as `sep`
It won't work on nested data frames with more than one level.
And you will have to use `columns.name`.
Flattening Map Types will have to find every key in the column.
This can be slow.
Examples
--------
data_mixed = [
{
"state": "Florida",
"shortname": "FL",
"info": {"governor": "Rick Scott"},
"counties": [
{"name": "Dade", "population": 12345},
{"name": "Broward", "population": 40000},
{"name": "Palm Beach", "population": 60000},
],
},
{
"state": "Ohio",
"shortname": "OH",
"info": {"governor": "John Kasich"},
"counties": [
{"name": "Summit", "population": 1234},
{"name": "Cuyahoga", "population": 1337},
],
},
]
data_mixed = spark.createDataFrame(data=data_mixed)
data_mixed.printSchema()
root
|-- counties: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- info: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
data_mixed_flat = flatten_test(df, sep=":")
data_mixed_flat.printSchema()
root
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
|-- counties:name: string (nullable = true)
|-- counties:population: string (nullable = true)
|-- info:governor: string (nullable = true)
data = [
{
"id": 1,
"name": "Cole Volk",
"fitness": {"height": 130, "weight": 60},
},
{"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
{
"id": 2,
"name": "Faye Raker",
"fitness": {"height": 130, "weight": 60},
},
]
df = spark.createDataFrame(data=data)
df.printSchema()
root
|-- fitness: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
df_flat = flatten_test(df, sep=":")
df_flat.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- fitness:height: long (nullable = true)
|-- fitness:weight: long (nullable = true)
data_struct = [
(("James",None,"Smith"),"OH","M"),
(("Anna","Rose",""),"NY","F"),
(("Julia","","Williams"),"OH","F"),
(("Maria","Anne","Jones"),"NY","M"),
(("Jen","Mary","Brown"),"NY","M"),
(("Mike","Mary","Williams"),"OH","M")
]
schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])
df_struct = spark.createDataFrame(data = data_struct, schema = schema)
df_struct.printSchema()
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
df_struct_flat = flatten_test(df_struct, sep=":")
df_struct_flat.printSchema()
root
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
|-- name:firstname: string (nullable = true)
|-- name:middlename: string (nullable = true)
|-- name:lastname: string (nullable = true)
"""
# compute Complex Fields (Arrays, Structs and Maptypes) in Schema
complex_fields = dict(
[
(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType
]
)
while len(complex_fields) != 0:
col_name = list(complex_fields.keys())[0]
# print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if type(complex_fields[col_name]) == StructType:
expanded = [
col(col_name + "." + k).alias(col_name + sep + k)
for k in [n.name for n in complex_fields[col_name]]
]
df = df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif type(complex_fields[col_name]) == ArrayType:
df = df.withColumn(col_name, explode_outer(col_name))
# if MapType then convert all sub element to columns.
# i.e. flatten
elif type(complex_fields[col_name]) == MapType:
keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(
map(
lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
keys,
)
)
drop_column_list = [col_name]
df = df.select(
[
col_name
for col_name in df.columns
if col_name not in drop_column_list
]
+ key_cols
)
# recompute remaining Complex Fields in Schema
complex_fields = dict(
[
(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType
]
)
return df
{code}
{code:java}
# We take a dataframe and return a new one with required changes
def cleanDataFrame(df: DataFrame) -> DataFrame:
# Returns a new sanitized field name (this function can be anything really)
def sanitizeFieldName(s: str) -> str:
return (
s.replace("-", "_")
.replace("&", "_")
.replace('"', "_")
.replace("[", "_")
.replace("]", "_")
.replace(".", "_")
)
# We call this on all fields to create a copy and to perform any changes we might
# want to do to the field.
def sanitizeField(field: StructField) -> StructField:
field = copy(field)
field.name = sanitizeFieldName(field.name)
# We recursively call cleanSchema on all types
field.dataType = cleanSchema(field.dataType)
return field
def cleanSchema(dataType: [DataType]) -> [DateType]:
dataType = copy(dataType)
# If the type is a StructType we need to recurse otherwise we can return since
# we've reached the leaf node
if isinstance(dataType, StructType):
# We call our sanitizer for all top level fields
dataType.fields = [sanitizeField(f) for f in dataType.fields]
elif isinstance(dataType, ArrayType):
dataType.elementType = cleanSchema(dataType.elementType)
return dataType
# Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
{code}
{code:java}
def json_to_norm_with_null(dir_path, path_to_save):
path = dir_path
for filename in os.listdir(path):
if not filename.endswith(".json"):
continue
fullname = os.path.join(path, filename)
with open(fullname) as json_file:
jsonstr = json.load(json_file)
df = spark.read.json(fullname)
df = cleanDataFrame(df)
df = flatten_test(df, sep=":")
df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
{code}
def cleanDataFrame is taken from a post at stackoverflow
def flatten_test the first 2 parts for array and struct are taken from [gnist github|https://gist.github.com/nmukerje/e65cde41be85470e4b8dfd9a2d6aed50]
The exemples in def flatten_test are taken from pandas json_normalize function.
There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.
> Normalize semi-structured data into tabular tables.
> ---------------------------------------------------
>
> Key: SPARK-36950
> URL: https://issues.apache.org/jira/browse/SPARK-36950
> Project: Spark
> Issue Type: Improvement
> Components: Pandas API on Spark, PySpark
> Affects Versions: 3.4.0
> Reporter: Bjørn Jørgensen
> Priority: Major
>
> Many users get seminested data form JSON or XML.
> There are some problems with querying this data, where there are nested fields.
> In pandas there is a [json_normalize|https://github.com/pandas-dev/pandas/blob/v1.3.3/pandas/io/json/_normalize.py#L112-L353] function that flat out nested dicts.
> Here are some examples for the use of those [Flatten Complex Nested JSON (PYSPARK)|https://stackoverflow.com/questions/73599398/flatten-complex-nested-json-pyspark/73666330#73666330]
> [Unable to load jsonl nested file into a flattened dataframe|https://stackoverflow.com/questions/73546452/unable-to-load-jsonl-nested-file-into-a-flattened-dataframe/73594355#73594355]
> With pandas users can use this function
>
> {code:java}
> def flatten_pandas(df_):
> #The same as flatten but for pandas
> have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
> have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
> have_nested = len(have_list) + len(have_dict)
>
> while have_nested!=0:
> if len(have_list)!=0:
> for _ in have_list:
> df_ = df_.explode(_)
>
> elif have_dict !=0:
> df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
>
> have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
> have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
> have_nested = len(have_list) + len(have_dict)
>
> return df_
> {code}
>
> With pyspark or pandas_api we don't have a function for getting dict to columns implemented.
> These are the functions that I'm using to do the same in pyspark.
> {code:java}
> from pyspark.sql.functions import *
> from pyspark.sql.types import *
> def flatten_test(df, sep="_"):
> """Returns a flattened dataframe.
> .. versionadded:: x.X.X
> Parameters
> ----------
> sep : str
> Delimiter for flatted columns. Default `_`
> Notes
> -----
> Don`t use `.` as `sep`
> It won't work on nested data frames with more than one level.
> And you will have to use `columns.name`.
> Flattening Map Types will have to find every key in the column.
> This can be slow.
> Examples
> --------
> data_mixed = [
> {
> "state": "Florida",
> "shortname": "FL",
> "info": {"governor": "Rick Scott"},
> "counties": [
> {"name": "Dade", "population": 12345},
> {"name": "Broward", "population": 40000},
> {"name": "Palm Beach", "population": 60000},
> ],
> },
> {
> "state": "Ohio",
> "shortname": "OH",
> "info": {"governor": "John Kasich"},
> "counties": [
> {"name": "Summit", "population": 1234},
> {"name": "Cuyahoga", "population": 1337},
> ],
> },
> ]
> data_mixed = spark.createDataFrame(data=data_mixed)
> data_mixed.printSchema()
> root
> |-- counties: array (nullable = true)
> | |-- element: map (containsNull = true)
> | | |-- key: string
> | | |-- value: string (valueContainsNull = true)
> |-- info: map (nullable = true)
> | |-- key: string
> | |-- value: string (valueContainsNull = true)
> |-- shortname: string (nullable = true)
> |-- state: string (nullable = true)
> data_mixed_flat = flatten_test(df, sep=":")
> data_mixed_flat.printSchema()
> root
> |-- shortname: string (nullable = true)
> |-- state: string (nullable = true)
> |-- counties:name: string (nullable = true)
> |-- counties:population: string (nullable = true)
> |-- info:governor: string (nullable = true)
> data = [
> {
> "id": 1,
> "name": "Cole Volk",
> "fitness": {"height": 130, "weight": 60},
> },
> {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
> {
> "id": 2,
> "name": "Faye Raker",
> "fitness": {"height": 130, "weight": 60},
> },
> ]
> df = spark.createDataFrame(data=data)
> df.printSchema()
> root
> |-- fitness: map (nullable = true)
> | |-- key: string
> | |-- value: long (valueContainsNull = true)
> |-- id: long (nullable = true)
> |-- name: string (nullable = true)
> df_flat = flatten_test(df, sep=":")
> df_flat.printSchema()
> root
> |-- id: long (nullable = true)
> |-- name: string (nullable = true)
> |-- fitness:height: long (nullable = true)
> |-- fitness:weight: long (nullable = true)
> data_struct = [
> (("James",None,"Smith"),"OH","M"),
> (("Anna","Rose",""),"NY","F"),
> (("Julia","","Williams"),"OH","F"),
> (("Maria","Anne","Jones"),"NY","M"),
> (("Jen","Mary","Brown"),"NY","M"),
> (("Mike","Mary","Williams"),"OH","M")
> ]
> schema = StructType([
> StructField('name', StructType([
> StructField('firstname', StringType(), True),
> StructField('middlename', StringType(), True),
> StructField('lastname', StringType(), True)
> ])),
> StructField('state', StringType(), True),
> StructField('gender', StringType(), True)
> ])
> df_struct = spark.createDataFrame(data = data_struct, schema = schema)
> df_struct.printSchema()
> root
> |-- name: struct (nullable = true)
> | |-- firstname: string (nullable = true)
> | |-- middlename: string (nullable = true)
> | |-- lastname: string (nullable = true)
> |-- state: string (nullable = true)
> |-- gender: string (nullable = true)
> df_struct_flat = flatten_test(df_struct, sep=":")
> df_struct_flat.printSchema()
> root
> |-- state: string (nullable = true)
> |-- gender: string (nullable = true)
> |-- name:firstname: string (nullable = true)
> |-- name:middlename: string (nullable = true)
> |-- name:lastname: string (nullable = true)
> """
> # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
> complex_fields = dict(
> [
> (field.name, field.dataType)
> for field in df.schema.fields
> if type(field.dataType) == ArrayType
> or type(field.dataType) == StructType
> or type(field.dataType) == MapType
> ]
> )
> while len(complex_fields) != 0:
> col_name = list(complex_fields.keys())[0]
> # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
> # if StructType then convert all sub element to columns.
> # i.e. flatten structs
> if type(complex_fields[col_name]) == StructType:
> expanded = [
> col(col_name + "." + k).alias(col_name + sep + k)
> for k in [n.name for n in complex_fields[col_name]]
> ]
> df = df.select("*", *expanded).drop(col_name)
> # if ArrayType then add the Array Elements as Rows using the explode function
> # i.e. explode Arrays
> elif type(complex_fields[col_name]) == ArrayType:
> df = df.withColumn(col_name, explode_outer(col_name))
> # if MapType then convert all sub element to columns.
> # i.e. flatten
> elif type(complex_fields[col_name]) == MapType:
> keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
> keys = list(map(lambda row: row[0], keys_df.collect()))
> key_cols = list(
> map(
> lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
> keys,
> )
> )
> drop_column_list = [col_name]
> df = df.select(
> [
> col_name
> for col_name in df.columns
> if col_name not in drop_column_list
> ]
> + key_cols
> )
> # recompute remaining Complex Fields in Schema
> complex_fields = dict(
> [
> (field.name, field.dataType)
> for field in df.schema.fields
> if type(field.dataType) == ArrayType
> or type(field.dataType) == StructType
> or type(field.dataType) == MapType
> ]
> )
> return df
> {code}
> {code:java}
> # We take a dataframe and return a new one with required changes
> def cleanDataFrame(df: DataFrame) -> DataFrame:
> # Returns a new sanitized field name (this function can be anything really)
> def sanitizeFieldName(s: str) -> str:
> return (
> s.replace("-", "_")
> .replace("&", "_")
> .replace('"', "_")
> .replace("[", "_")
> .replace("]", "_")
> .replace(".", "_")
> )
> # We call this on all fields to create a copy and to perform any changes we might
> # want to do to the field.
> def sanitizeField(field: StructField) -> StructField:
> field = copy(field)
> field.name = sanitizeFieldName(field.name)
> # We recursively call cleanSchema on all types
> field.dataType = cleanSchema(field.dataType)
> return field
> def cleanSchema(dataType: [DataType]) -> [DateType]:
> dataType = copy(dataType)
> # If the type is a StructType we need to recurse otherwise we can return since
> # we've reached the leaf node
> if isinstance(dataType, StructType):
> # We call our sanitizer for all top level fields
> dataType.fields = [sanitizeField(f) for f in dataType.fields]
> elif isinstance(dataType, ArrayType):
> dataType.elementType = cleanSchema(dataType.elementType)
> return dataType
> # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
> return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
> {code}
> {code:java}
> def json_to_norm_with_null(dir_path, path_to_save):
> path = dir_path
> for filename in os.listdir(path):
> if not filename.endswith(".json"):
> continue
> fullname = os.path.join(path, filename)
> with open(fullname) as json_file:
> jsonstr = json.load(json_file)
> df = spark.read.json(fullname)
> df = cleanDataFrame(df)
> df = flatten_test(df, sep=":")
> df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
> {code}
> def cleanDataFrame is taken from a post at stackoverflow
> def flatten_test the first 2 parts for array and struct are taken from [gnist github|https://gist.github.com/nmukerje/e65cde41be85470e4b8dfd9a2d6aed50]
> The last part with map are inspired from https://mungingdata.com/pyspark/dict-map-to-multiple-columns/
> The exemples in def flatten_test are taken from pandas json_normalize function.
> There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org