You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kevin Mellott <ke...@gmail.com> on 2016/01/21 01:34:51 UTC

Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem

Hi Andy,

According to the API documentation for DataFrame
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame>,
you should have access to *sqlContext* as a property off of the DataFrame
instance. In your example, you could then do something like:

df.sqlContext.udf.register(...)

Thanks,
Kevin

On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <
Andy@santacruzintegration.com> wrote:

> For clarity callUDF() is not defined on DataFrames. It is defined on org.apache.spark.sql.functions
> . Strange the class name starts with lower case. I have not figure out
> how to use function class.
>
>
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html
>
> Andy
>
> From: Andrew Davidson <An...@SantaCruzIntegration.com>
> Date: Wednesday, January 20, 2016 at 4:05 PM
> To: "user @spark" <us...@spark.apache.org>
> Subject: trouble implementing complex transformer in java that can be
> used with Pipeline. Scala to Java porting problem
>
> I am using 1.6.0. I am having trouble implementing a custom transformer
> derived from org.apache.spark.ml.Transformer in Java that I can use in
> a PipeLine.
>
> So far the only way I figure out how to implement any kind of complex
> functionality and have it applied to a DataFrame is to implement a UDF. For
> example
>
>
>    class StemmerUDF implements UDF1<String, List<String>>, Serializable {
>
>         private static final long serialVersionUID = 1L;
>
>
>         @Override
>
>         public List<String> call(String text) throws Exception {
>
>             List<String> ret = stemText(text); //call org.apache.lucene
>
>             return ret;
>
>         }
>
>     }
>
>
> Before I can use the UDF it needs to be registered. This requires the
> sqlContext. *The problem is sqlContext is not available during
> pipeline.load()*
>
>    void registerUDF(SQLContext sqlContext) {
>
>         if (udf == null) {
>
>             udf = new StemmerUDF();
>
>             DataType returnType = DataTypes.createArrayType(DataTypes.
> StringType);
>
>             sqlContext.udf().register(udfName, udf, returnType);
>
>         }
>
>     }
>
>
> Our transformer needs to implement transform(). For it to be able to use
> the registered UDF we need the sqlContext. *The problem is the sqlContext
> is not part of the signature of transform.* My current hack is to pass
> the sqlContext to the constructor and not to use pipelines
>
>   @Override
>
>     public DataFrame transform(DataFrame df) {
>
>       String fmt = "%s(%s) as %s";
>
>         String stmt = String.format(fmt, udfName, inputCol, outputCol);
>
>         logger.info("\nstmt: {}", stmt);
>
>         DataFrame ret = df.selectExpr("*", stmt);
>
>         return ret;
>
> }
>
>
> Is they a way to do something like df.callUDF(myUDF);
>
>
> *The following Scala code looks like it is close to what I need. I not
> been able to figure out how do something like this in Java 8. callUDF does
> not seem to be avaliable.*
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>
> @DeveloperApi
>
> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT,
> T]]
>
>   extends Transformer with HasInputCol with HasOutputCol with Logging {
>
> . . .
>
>
>  override def transform(dataset: DataFrame): DataFrame = {
>
>     transformSchema(dataset.schema, logging = true)
>
>     dataset.withColumn($(outputCol),
>
>       callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol
> ))))
>
>   }
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
>
>
> class Tokenizer(override val uid: String)
>
>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
> DefaultParamsWritable {
>
>
> . . .
>
>   override protected def createTransformFunc: String => Seq[String] = {
>
>     _.toLowerCase.split("\\s")
>
>   }
>
> . . .
>
> }
>
>
> Kind regards
>
>
> Andy
>
>
>

Re: trouble implementing complex transformer in java that can be used with Pipeline. Scala to Java porting problem

Posted by Andy Davidson <An...@SantaCruzIntegration.com>.
Very Nice!. Many thanks Kevin. I wish I found this out a couple of weeks
ago.

Andy

From:  Kevin Mellott <ke...@gmail.com>
Date:  Wednesday, January 20, 2016 at 4:34 PM
To:  Andrew Davidson <An...@SantaCruzIntegration.com>
Cc:  "user @spark" <us...@spark.apache.org>
Subject:  Re: trouble implementing complex transformer in java that can be
used with Pipeline. Scala to Java porting problem

> Hi Andy,
> 
> According to the API documentation for DataFrame
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql
> .DataFrame> , you should have access to sqlContext as a property off of the
> DataFrame instance. In your example, you could then do something like:
> 
> df.sqlContext.udf.register(...)
> 
> Thanks,
> Kevin
> 
> On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <An...@santacruzintegration.com>
> wrote:
>> For clarity callUDF() is not defined on DataFrames. It is defined on
>> org.apache.spark.sql.functions . Strange the class name starts with lower
>> case. I have not figure out how to use function class.
>> 
>> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.h
>> tml
>> 
>> Andy
>> 
>> From:  Andrew Davidson <An...@SantaCruzIntegration.com>
>> Date:  Wednesday, January 20, 2016 at 4:05 PM
>> To:  "user @spark" <us...@spark.apache.org>
>> Subject:  trouble implementing  complex transformer in java that can be used
>> with Pipeline. Scala to Java porting problem
>> 
>>> I am using 1.6.0. I am having trouble implementing a custom transformer
>>> derived from org.apache.spark.ml.Transformer in Java that I can use in a
>>> PipeLine.
>>> 
>>> So far the only way I figure out how to implement any kind of complex
>>> functionality and have it applied to a DataFrame is to implement a UDF. For
>>> example
>>> 
>>> 
>>>    class StemmerUDF implements UDF1<String, List<String>>, Serializable {
>>> 
>>>         private static final long serialVersionUID = 1L;
>>> 
>>> 
>>> 
>>>         @Override
>>> 
>>>         public List<String> call(String text) throws Exception {
>>> 
>>>             List<String> ret = stemText(text); //call org.apache.lucene
>>> 
>>>             return ret;
>>> 
>>>         }
>>> 
>>>     }
>>> 
>>> 
>>> 
>>> Before I can use the UDF it needs to be registered. This requires the
>>> sqlContext. The problem is sqlContext is not available during
>>> pipeline.load()
>>> 
>>> 
>>>    void registerUDF(SQLContext sqlContext) {
>>> 
>>>         if (udf == null) {
>>> 
>>>             udf = new StemmerUDF();
>>> 
>>>             DataType returnType =
>>> DataTypes.createArrayType(DataTypes.StringType);
>>> 
>>>             sqlContext.udf().register(udfName, udf, returnType);
>>> 
>>>         }
>>> 
>>>     }
>>> 
>>> 
>>> Our transformer needs to implement transform(). For it to be able to use the
>>> registered UDF we need the sqlContext. The problem is the sqlContext is not
>>> part of the signature of transform. My current hack is to pass the
>>> sqlContext to the constructor and not to use pipelines
>>>   @Override
>>> 
>>>     public DataFrame transform(DataFrame df) {
>>> 
>>>       String fmt = "%s(%s) as %s";
>>> 
>>>         String stmt = String.format(fmt, udfName, inputCol, outputCol);
>>> 
>>>         logger.info("\nstmt: {}", stmt);
>>> 
>>>         DataFrame ret = df.selectExpr("*", stmt);
>>> 
>>>         return ret;
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Is they a way to do something like df.callUDF(myUDF);
>>> 
>>> 
>>> 
>>> The following Scala code looks like it is close to what I need. I not been
>>> able to figure out how do something like this in Java 8. callUDF does not
>>> seem to be avaliable.
>>> 
>>> 
>>> 
>>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>>> 
>>> @DeveloperApi
>>> 
>>> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]
>>> 
>>>   extends Transformer with HasInputCol with HasOutputCol with Logging {
>>> 
>>> 
>>> 
>>> . . .
>>> 
>>> 
>>> 
>>>  override def transform(dataset: DataFrame): DataFrame = {
>>> 
>>>     transformSchema(dataset.schema, logging = true)
>>> 
>>>     dataset.withColumn($(outputCol),
>>> 
>>>       callUDF(this.createTransformFunc, outputDataType,
>>> dataset($(inputCol))))
>>> 
>>>   }
>>> 
>>> 
>>> 
>>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer
>>> .scala 
>>> 
>>> 
>>> 
>>> class Tokenizer(override val uid: String)
>>> 
>>>   extends UnaryTransformer[String, Seq[String], Tokenizer] with
>>> DefaultParamsWritable {
>>> 
>>> 
>>> 
>>> . . .
>>> 
>>>   override protected def createTransformFunc: String => Seq[String] = {
>>> 
>>>     _.toLowerCase.split("\\s")
>>> 
>>>   }
>>> 
>>> . . .
>>> 
>>> }
>>> 
>>> 
>>> 
>>> Kind regards
>>> 
>>> 
>>> 
>>> Andy
>>> 
>>> 
>