You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by marc nicole <mk...@gmail.com> on 2022/05/28 10:13:44 UTC
k-anonymity with Spark in Java
Hi Spark devs,
Anybody willing to check my code implementing *k-anonymity*?
public static Dataset < Row > kAnonymizeBySuppression(SparkSession
sparksession, Dataset < Row > initDataset, List < String > qidAtts, Integer
k_anonymity_constant) {
Dataset < Row > anonymizedDF = sparksession.emptyDataFrame();
Dataset < Row > tmpDF = sparksession.emptyDataFrame();
List < Column > groupByQidAttributes = qidAtts.stream().map(functions::
col).collect(Collectors.toList());
// groupBy and count each occurence.
Dataset < Row > groupedRowsDF = initDataset.withColumn("qidsFreqs",
count("*").over(Window.partitionBy(groupByQidAttributes.toArray(new Column[
groupByQidAttributes.size()]))));
Dataset < Row > rowsDeleteDF =
groupedRowsDF.select(col("*")).where("qidsFreqs
<" + k_anonymity_constant).toDF();
tmpDF = groupedRowsDF.select(col("*")).where("qidsFreqs >=" +
k_anonymity_constant).toDF();
for (String qidAtt: qidAtts) {
Dataset < Row > groupedRowsProcDF = rowsDeleteDF.withColumn(
"attFreq", approx_count_distinct(qidAtt).over(Window.partitionBy(
groupByQidAttributes.toArray(new Column[groupByQidAttributes.size()]))));
Dataset < Row > rowsDeleteDFUpdate = groupedRowsProcDF.select(col(
"*")).where("attFreq <" + k_anonymity_constant).toDF();
if (anonymizedDF.count() == 0)
anonymizedDF = rowsDeleteDFUpdate;
if (rowsDeleteDF.count() != 0) {
anonymizedDF = anonymizedDF.drop("attFreq").withColumn(qidAtt,
lit("*"));
}
}
return tmpDF.drop("qidsFreqs").union(anonymizedDF.drop("qidsFreqs"));
}
Thanks in advance for your improving comments.
Marc.