You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Scott W <de...@gmail.com> on 2016/07/05 05:02:59 UTC

Spark Dataframe validating column names

Hello,

I'm processing events using Dataframes converted from a stream of JSON
events (Spark streaming) which eventually gets written out as as Parquet
format. There are different JSON events coming in so we use schema
inference feature of Spark SQL

The problem is some of the JSON events contains spaces in the keys which I
want to log and filter/drop such events from the data frame before
converting it to Parquet because ,;{}()\n\t= are considered special
characters in Parquet schema (CatalystSchemaConverter) as listed in [1]
below and thus should not be allowed in the column names.

How can I do such validations in Dataframe on the column names and drop
such an event altogether without erroring out the Spark Streaming job?

[1] Spark's CatalystSchemaConverter

def checkFieldName(name: String): Unit = {
    // ,;{}()\n\t= and space are special characters in Parquet schema
    checkConversionRequirement(
      !name.matches(".*[ ,;{}()\n\t=].*"),
      s"""Attribute name "$name" contains invalid character(s) among "
,;{}()\\n\\t=".
         |Please use alias to rename it.
       """.stripMargin.split("\n").mkString(" ").trim)
  }

Re: Spark Dataframe validating column names

Posted by Scott W <de...@gmail.com>.
Hi,

Yes I tried that however, I also want to "pin down" that specific event
containing invalid characters in the column names (per the parquet spec)
and drop it from the df before converting it to parquet.

Where I'm having trouble is my dataframe might have events with different
set of fields, so directly applying df.columns OR df.schema would return
the superset-schema for all the events.

I tried this approach but unfortunately RDD[Row] cannot be converted back
to Dataframe without a schema. I'm relying on automatic schema inference of
Spark SQL.

df.map { r =>
  val fieldNames = r.schema.fieldNames

......

Thanks!

On Tue, Jul 5, 2016 at 5:56 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> What do you think of using df.columns to know the column names and
> process appropriately or df.schema?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Jul 5, 2016 at 7:02 AM, Scott W <de...@gmail.com> wrote:
> > Hello,
> >
> > I'm processing events using Dataframes converted from a stream of JSON
> > events (Spark streaming) which eventually gets written out as as Parquet
> > format. There are different JSON events coming in so we use schema
> inference
> > feature of Spark SQL
> >
> > The problem is some of the JSON events contains spaces in the keys which
> I
> > want to log and filter/drop such events from the data frame before
> > converting it to Parquet because ,;{}()\n\t= are considered special
> > characters in Parquet schema (CatalystSchemaConverter) as listed in [1]
> > below and thus should not be allowed in the column names.
> >
> > How can I do such validations in Dataframe on the column names and drop
> such
> > an event altogether without erroring out the Spark Streaming job?
> >
> > [1] Spark's CatalystSchemaConverter
> >
> > def checkFieldName(name: String): Unit = {
> >     // ,;{}()\n\t= and space are special characters in Parquet schema
> >     checkConversionRequirement(
> >       !name.matches(".*[ ,;{}()\n\t=].*"),
> >       s"""Attribute name "$name" contains invalid character(s) among "
> > ,;{}()\\n\\t=".
> >          |Please use alias to rename it.
> >        """.stripMargin.split("\n").mkString(" ").trim)
> >   }
>

Re: Spark Dataframe validating column names

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

What do you think of using df.columns to know the column names and
process appropriately or df.schema?

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jul 5, 2016 at 7:02 AM, Scott W <de...@gmail.com> wrote:
> Hello,
>
> I'm processing events using Dataframes converted from a stream of JSON
> events (Spark streaming) which eventually gets written out as as Parquet
> format. There are different JSON events coming in so we use schema inference
> feature of Spark SQL
>
> The problem is some of the JSON events contains spaces in the keys which I
> want to log and filter/drop such events from the data frame before
> converting it to Parquet because ,;{}()\n\t= are considered special
> characters in Parquet schema (CatalystSchemaConverter) as listed in [1]
> below and thus should not be allowed in the column names.
>
> How can I do such validations in Dataframe on the column names and drop such
> an event altogether without erroring out the Spark Streaming job?
>
> [1] Spark's CatalystSchemaConverter
>
> def checkFieldName(name: String): Unit = {
>     // ,;{}()\n\t= and space are special characters in Parquet schema
>     checkConversionRequirement(
>       !name.matches(".*[ ,;{}()\n\t=].*"),
>       s"""Attribute name "$name" contains invalid character(s) among "
> ,;{}()\\n\\t=".
>          |Please use alias to rename it.
>        """.stripMargin.split("\n").mkString(" ").trim)
>   }

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org