You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bjørn Jørgensen (Jira)" <ji...@apache.org> on 2022/11/30 13:54:00 UTC
[jira] [Updated] (SPARK-36950) Normalize semi-structured data into tabular tables.
[ https://issues.apache.org/jira/browse/SPARK-36950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bjørn Jørgensen updated SPARK-36950:
------------------------------------
Summary: Normalize semi-structured data into tabular tables. (was: Normalize semi-structured data into a flat table.)
> Normalize semi-structured data into tabular tables.
> ---------------------------------------------------
>
> Key: SPARK-36950
> URL: https://issues.apache.org/jira/browse/SPARK-36950
> Project: Spark
> Issue Type: Improvement
> Components: Pandas API on Spark, PySpark
> Affects Versions: 3.4.0
> Reporter: Bjørn Jørgensen
> Priority: Major
>
> Many users get seminested data form JSON or XML.
> There are some problems with querying this data, where there are nested fields.
> In pandas there is a [json_normalize|https://github.com/pandas-dev/pandas/blob/v1.3.3/pandas/io/json/_normalize.py#L112-L353] function that flat out nested dicts.
> Here are some examples for the use of those [Flatten Complex Nested JSON (PYSPARK)|https://stackoverflow.com/questions/73599398/flatten-complex-nested-json-pyspark/73666330#73666330]
> [Unable to load jsonl nested file into a flattened dataframe|https://stackoverflow.com/questions/73546452/unable-to-load-jsonl-nested-file-into-a-flattened-dataframe/73594355#73594355]
> With pandas users can use this function
>
> {code:java}
> def flatten_pandas(df_):
> #The same as flatten but for pandas
> have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
> have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
> have_nested = len(have_list) + len(have_dict)
>
> while have_nested!=0:
> if len(have_list)!=0:
> for _ in have_list:
> df_ = df_.explode(_)
>
> elif have_dict !=0:
> df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, orient="records")), sep=":")
>
> have_list = df_.columns[df_.applymap(lambda x: isinstance(x, list)).any()].tolist()
> have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, dict)).any()].tolist()
> have_nested = len(have_list) + len(have_dict)
>
> return df_
> {code}
>
> With pyspark or pandas_api we don't have a function for getting dict to columns implemented.
> These are the functions that I'm using to do the same in pyspark.
> {code:java}
> from pyspark.sql.functions import *
> from pyspark.sql.types import *
> def flatten_test(df, sep="_"):
> """Returns a flattened dataframe.
> .. versionadded:: x.X.X
> Parameters
> ----------
> sep : str
> Delimiter for flatted columns. Default `_`
> Notes
> -----
> Don`t use `.` as `sep`
> It won't work on nested data frames with more than one level.
> And you will have to use `columns.name`.
> Flattening Map Types will have to find every key in the column.
> This can be slow.
> Examples
> --------
> data_mixed = [
> {
> "state": "Florida",
> "shortname": "FL",
> "info": {"governor": "Rick Scott"},
> "counties": [
> {"name": "Dade", "population": 12345},
> {"name": "Broward", "population": 40000},
> {"name": "Palm Beach", "population": 60000},
> ],
> },
> {
> "state": "Ohio",
> "shortname": "OH",
> "info": {"governor": "John Kasich"},
> "counties": [
> {"name": "Summit", "population": 1234},
> {"name": "Cuyahoga", "population": 1337},
> ],
> },
> ]
> data_mixed = spark.createDataFrame(data=data_mixed)
> data_mixed.printSchema()
> root
> |-- counties: array (nullable = true)
> | |-- element: map (containsNull = true)
> | | |-- key: string
> | | |-- value: string (valueContainsNull = true)
> |-- info: map (nullable = true)
> | |-- key: string
> | |-- value: string (valueContainsNull = true)
> |-- shortname: string (nullable = true)
> |-- state: string (nullable = true)
> data_mixed_flat = flatten_test(df, sep=":")
> data_mixed_flat.printSchema()
> root
> |-- shortname: string (nullable = true)
> |-- state: string (nullable = true)
> |-- counties:name: string (nullable = true)
> |-- counties:population: string (nullable = true)
> |-- info:governor: string (nullable = true)
> data = [
> {
> "id": 1,
> "name": "Cole Volk",
> "fitness": {"height": 130, "weight": 60},
> },
> {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
> {
> "id": 2,
> "name": "Faye Raker",
> "fitness": {"height": 130, "weight": 60},
> },
> ]
> df = spark.createDataFrame(data=data)
> df.printSchema()
> root
> |-- fitness: map (nullable = true)
> | |-- key: string
> | |-- value: long (valueContainsNull = true)
> |-- id: long (nullable = true)
> |-- name: string (nullable = true)
> df_flat = flatten_test(df, sep=":")
> df_flat.printSchema()
> root
> |-- id: long (nullable = true)
> |-- name: string (nullable = true)
> |-- fitness:height: long (nullable = true)
> |-- fitness:weight: long (nullable = true)
> data_struct = [
> (("James",None,"Smith"),"OH","M"),
> (("Anna","Rose",""),"NY","F"),
> (("Julia","","Williams"),"OH","F"),
> (("Maria","Anne","Jones"),"NY","M"),
> (("Jen","Mary","Brown"),"NY","M"),
> (("Mike","Mary","Williams"),"OH","M")
> ]
> schema = StructType([
> StructField('name', StructType([
> StructField('firstname', StringType(), True),
> StructField('middlename', StringType(), True),
> StructField('lastname', StringType(), True)
> ])),
> StructField('state', StringType(), True),
> StructField('gender', StringType(), True)
> ])
> df_struct = spark.createDataFrame(data = data_struct, schema = schema)
> df_struct.printSchema()
> root
> |-- name: struct (nullable = true)
> | |-- firstname: string (nullable = true)
> | |-- middlename: string (nullable = true)
> | |-- lastname: string (nullable = true)
> |-- state: string (nullable = true)
> |-- gender: string (nullable = true)
> df_struct_flat = flatten_test(df_struct, sep=":")
> df_struct_flat.printSchema()
> root
> |-- state: string (nullable = true)
> |-- gender: string (nullable = true)
> |-- name:firstname: string (nullable = true)
> |-- name:middlename: string (nullable = true)
> |-- name:lastname: string (nullable = true)
> """
> # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
> complex_fields = dict(
> [
> (field.name, field.dataType)
> for field in df.schema.fields
> if type(field.dataType) == ArrayType
> or type(field.dataType) == StructType
> or type(field.dataType) == MapType
> ]
> )
> while len(complex_fields) != 0:
> col_name = list(complex_fields.keys())[0]
> # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
> # if StructType then convert all sub element to columns.
> # i.e. flatten structs
> if type(complex_fields[col_name]) == StructType:
> expanded = [
> col(col_name + "." + k).alias(col_name + sep + k)
> for k in [n.name for n in complex_fields[col_name]]
> ]
> df = df.select("*", *expanded).drop(col_name)
> # if ArrayType then add the Array Elements as Rows using the explode function
> # i.e. explode Arrays
> elif type(complex_fields[col_name]) == ArrayType:
> df = df.withColumn(col_name, explode_outer(col_name))
> # if MapType then convert all sub element to columns.
> # i.e. flatten
> elif type(complex_fields[col_name]) == MapType:
> keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct()
> keys = list(map(lambda row: row[0], keys_df.collect()))
> key_cols = list(
> map(
> lambda f: col(col_name).getItem(f).alias(str(col_name + sep + f)),
> keys,
> )
> )
> drop_column_list = [col_name]
> df = df.select(
> [
> col_name
> for col_name in df.columns
> if col_name not in drop_column_list
> ]
> + key_cols
> )
> # recompute remaining Complex Fields in Schema
> complex_fields = dict(
> [
> (field.name, field.dataType)
> for field in df.schema.fields
> if type(field.dataType) == ArrayType
> or type(field.dataType) == StructType
> or type(field.dataType) == MapType
> ]
> )
> return df
> {code}
> {code:java}
> # We take a dataframe and return a new one with required changes
> def cleanDataFrame(df: DataFrame) -> DataFrame:
> # Returns a new sanitized field name (this function can be anything really)
> def sanitizeFieldName(s: str) -> str:
> return (
> s.replace("-", "_")
> .replace("&", "_")
> .replace('"', "_")
> .replace("[", "_")
> .replace("]", "_")
> .replace(".", "_")
> )
> # We call this on all fields to create a copy and to perform any changes we might
> # want to do to the field.
> def sanitizeField(field: StructField) -> StructField:
> field = copy(field)
> field.name = sanitizeFieldName(field.name)
> # We recursively call cleanSchema on all types
> field.dataType = cleanSchema(field.dataType)
> return field
> def cleanSchema(dataType: [DataType]) -> [DateType]:
> dataType = copy(dataType)
> # If the type is a StructType we need to recurse otherwise we can return since
> # we've reached the leaf node
> if isinstance(dataType, StructType):
> # We call our sanitizer for all top level fields
> dataType.fields = [sanitizeField(f) for f in dataType.fields]
> elif isinstance(dataType, ArrayType):
> dataType.elementType = cleanSchema(dataType.elementType)
> return dataType
> # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data
> return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
> {code}
> {code:java}
> def json_to_norm_with_null(dir_path, path_to_save):
> path = dir_path
> for filename in os.listdir(path):
> if not filename.endswith(".json"):
> continue
> fullname = os.path.join(path, filename)
> with open(fullname) as json_file:
> jsonstr = json.load(json_file)
> df = spark.read.json(fullname)
> df = cleanDataFrame(df)
> df = flatten_test(df, sep=":")
> df.write.mode("append").option("ignoreNullFields", "false").json(path_to_save)
> {code}
> def cleanDataFrame is taken from a post at stackoverflow
> def flatten_test the first 2 parts for array and struct are taken from [gnist github|https://gist.github.com/nmukerje/e65cde41be85470e4b8dfd9a2d6aed50]
> The exemples in def flatten_test are taken from pandas json_normalize function.
> There is one problem with def flatten_test. It needs to load one and one dataframe if the schema is different.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org