You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kevin Mellott <ke...@gmail.com> on 2016/01/21 01:34:51 UTC
Re: trouble implementing complex transformer in java that can be used
with Pipeline. Scala to Java porting problem
Hi Andy,
According to the API documentation for DataFrame
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame>,
you should have access to *sqlContext* as a property off of the DataFrame
instance. In your example, you could then do something like:
df.sqlContext.udf.register(...)
Thanks,
Kevin
On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <
Andy@santacruzintegration.com> wrote:
> For clarity callUDF() is not defined on DataFrames. It is defined on org.apache.spark.sql.functions
> . Strange the class name starts with lower case. I have not figure out
> how to use function class.
>
>
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html
>
> Andy
>
> From: Andrew Davidson <An...@SantaCruzIntegration.com>
> Date: Wednesday, January 20, 2016 at 4:05 PM
> To: "user @spark" <us...@spark.apache.org>
> Subject: trouble implementing complex transformer in java that can be
> used with Pipeline. Scala to Java porting problem
>
> I am using 1.6.0. I am having trouble implementing a custom transformer
> derived from org.apache.spark.ml.Transformer in Java that I can use in
> a PipeLine.
>
> So far the only way I figure out how to implement any kind of complex
> functionality and have it applied to a DataFrame is to implement a UDF. For
> example
>
>
> class StemmerUDF implements UDF1<String, List<String>>, Serializable {
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public List<String> call(String text) throws Exception {
>
> List<String> ret = stemText(text); //call org.apache.lucene
>
> return ret;
>
> }
>
> }
>
>
> Before I can use the UDF it needs to be registered. This requires the
> sqlContext. *The problem is sqlContext is not available during
> pipeline.load()*
>
> void registerUDF(SQLContext sqlContext) {
>
> if (udf == null) {
>
> udf = new StemmerUDF();
>
> DataType returnType = DataTypes.createArrayType(DataTypes.
> StringType);
>
> sqlContext.udf().register(udfName, udf, returnType);
>
> }
>
> }
>
>
> Our transformer needs to implement transform(). For it to be able to use
> the registered UDF we need the sqlContext. *The problem is the sqlContext
> is not part of the signature of transform.* My current hack is to pass
> the sqlContext to the constructor and not to use pipelines
>
> @Override
>
> public DataFrame transform(DataFrame df) {
>
> String fmt = "%s(%s) as %s";
>
> String stmt = String.format(fmt, udfName, inputCol, outputCol);
>
> logger.info("\nstmt: {}", stmt);
>
> DataFrame ret = df.selectExpr("*", stmt);
>
> return ret;
>
> }
>
>
> Is they a way to do something like df.callUDF(myUDF);
>
>
> *The following Scala code looks like it is close to what I need. I not
> been able to figure out how do something like this in Java 8. callUDF does
> not seem to be avaliable.*
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>
> @DeveloperApi
>
> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT,
> T]]
>
> extends Transformer with HasInputCol with HasOutputCol with Logging {
>
> . . .
>
>
> override def transform(dataset: DataFrame): DataFrame = {
>
> transformSchema(dataset.schema, logging = true)
>
> dataset.withColumn($(outputCol),
>
> callUDF(this.createTransformFunc, outputDataType, dataset($(inputCol
> ))))
>
> }
>
>
>
> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
>
>
> class Tokenizer(override val uid: String)
>
> extends UnaryTransformer[String, Seq[String], Tokenizer] with
> DefaultParamsWritable {
>
>
> . . .
>
> override protected def createTransformFunc: String => Seq[String] = {
>
> _.toLowerCase.split("\\s")
>
> }
>
> . . .
>
> }
>
>
> Kind regards
>
>
> Andy
>
>
>
Re: trouble implementing complex transformer in java that can be used
with Pipeline. Scala to Java porting problem
Posted by Andy Davidson <An...@SantaCruzIntegration.com>.
Very Nice!. Many thanks Kevin. I wish I found this out a couple of weeks
ago.
Andy
From: Kevin Mellott <ke...@gmail.com>
Date: Wednesday, January 20, 2016 at 4:34 PM
To: Andrew Davidson <An...@SantaCruzIntegration.com>
Cc: "user @spark" <us...@spark.apache.org>
Subject: Re: trouble implementing complex transformer in java that can be
used with Pipeline. Scala to Java porting problem
> Hi Andy,
>
> According to the API documentation for DataFrame
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql
> .DataFrame> , you should have access to sqlContext as a property off of the
> DataFrame instance. In your example, you could then do something like:
>
> df.sqlContext.udf.register(...)
>
> Thanks,
> Kevin
>
> On Wed, Jan 20, 2016 at 6:15 PM, Andy Davidson <An...@santacruzintegration.com>
> wrote:
>> For clarity callUDF() is not defined on DataFrames. It is defined on
>> org.apache.spark.sql.functions . Strange the class name starts with lower
>> case. I have not figure out how to use function class.
>>
>> http://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.h
>> tml
>>
>> Andy
>>
>> From: Andrew Davidson <An...@SantaCruzIntegration.com>
>> Date: Wednesday, January 20, 2016 at 4:05 PM
>> To: "user @spark" <us...@spark.apache.org>
>> Subject: trouble implementing complex transformer in java that can be used
>> with Pipeline. Scala to Java porting problem
>>
>>> I am using 1.6.0. I am having trouble implementing a custom transformer
>>> derived from org.apache.spark.ml.Transformer in Java that I can use in a
>>> PipeLine.
>>>
>>> So far the only way I figure out how to implement any kind of complex
>>> functionality and have it applied to a DataFrame is to implement a UDF. For
>>> example
>>>
>>>
>>> class StemmerUDF implements UDF1<String, List<String>>, Serializable {
>>>
>>> private static final long serialVersionUID = 1L;
>>>
>>>
>>>
>>> @Override
>>>
>>> public List<String> call(String text) throws Exception {
>>>
>>> List<String> ret = stemText(text); //call org.apache.lucene
>>>
>>> return ret;
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> Before I can use the UDF it needs to be registered. This requires the
>>> sqlContext. The problem is sqlContext is not available during
>>> pipeline.load()
>>>
>>>
>>> void registerUDF(SQLContext sqlContext) {
>>>
>>> if (udf == null) {
>>>
>>> udf = new StemmerUDF();
>>>
>>> DataType returnType =
>>> DataTypes.createArrayType(DataTypes.StringType);
>>>
>>> sqlContext.udf().register(udfName, udf, returnType);
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> Our transformer needs to implement transform(). For it to be able to use the
>>> registered UDF we need the sqlContext. The problem is the sqlContext is not
>>> part of the signature of transform. My current hack is to pass the
>>> sqlContext to the constructor and not to use pipelines
>>> @Override
>>>
>>> public DataFrame transform(DataFrame df) {
>>>
>>> String fmt = "%s(%s) as %s";
>>>
>>> String stmt = String.format(fmt, udfName, inputCol, outputCol);
>>>
>>> logger.info("\nstmt: {}", stmt);
>>>
>>> DataFrame ret = df.selectExpr("*", stmt);
>>>
>>> return ret;
>>>
>>> }
>>>
>>>
>>>
>>> Is they a way to do something like df.callUDF(myUDF);
>>>
>>>
>>>
>>> The following Scala code looks like it is close to what I need. I not been
>>> able to figure out how do something like this in Java 8. callUDF does not
>>> seem to be avaliable.
>>>
>>>
>>>
>>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
>>>
>>> @DeveloperApi
>>>
>>> abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]
>>>
>>> extends Transformer with HasInputCol with HasOutputCol with Logging {
>>>
>>>
>>>
>>> . . .
>>>
>>>
>>>
>>> override def transform(dataset: DataFrame): DataFrame = {
>>>
>>> transformSchema(dataset.schema, logging = true)
>>>
>>> dataset.withColumn($(outputCol),
>>>
>>> callUDF(this.createTransformFunc, outputDataType,
>>> dataset($(inputCol))))
>>>
>>> }
>>>
>>>
>>>
>>> spark/spark-1.6.0/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer
>>> .scala
>>>
>>>
>>>
>>> class Tokenizer(override val uid: String)
>>>
>>> extends UnaryTransformer[String, Seq[String], Tokenizer] with
>>> DefaultParamsWritable {
>>>
>>>
>>>
>>> . . .
>>>
>>> override protected def createTransformFunc: String => Seq[String] = {
>>>
>>> _.toLowerCase.split("\\s")
>>>
>>> }
>>>
>>> . . .
>>>
>>> }
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>