You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Everett Rush (Jira)" <ji...@apache.org> on 2020/06/10 21:53:00 UTC
[jira] [Comment Edited] (SPARK-27249) Developers API for
Transformers beyond UnaryTransformer
[ https://issues.apache.org/jira/browse/SPARK-27249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17132743#comment-17132743 ]
Everett Rush edited comment on SPARK-27249 at 6/10/20, 9:52 PM:
----------------------------------------------------------------
I checked out the code and the design. Thanks for the great effort, but this doesn't quite meet the need. The transformer contract can still be met with a transform function that returns a Dataframe.
I would like something more like this.
{code:java}
@DeveloperApi
abstract class MultiColumnTransformer[T<: MultiColumnTransformer[T]]
extends Transformer with HasOutputCol with Logging {
def setOutputCol(value: String): T = set(outputCol, value).asInstanceOf[T]
protected def outputDataType: DataType
protected def transformFunc: Iterator[Row] => Iterator[Row]
override def transformSchema(schema: StructType): StructType = {
if (schema.fieldNames.contains($(outputCol))) {
throw new IllegalArgumentException(s"Output column ${$(outputCol)} already exists.")
}
val outputFields = schema.fields :+
StructField($(outputCol), outputDataType, nullable = false)
StructType(outputFields)
}
def transform(dataset: DataFrame, targetSchema: StructType): DataFrame = {
val targetEncoder = RowEncoder(targetSchema)
dataset.mapPartitions(transformFunc)(targetEncoder)
}
override def transform(dataset: Dataset[_]): DataFrame = {
val dataframe = dataset.toDF()
val targetSchema = transformSchema(dataframe.schema, logging = true)
transform(dataframe, targetSchema)
}
override def copy(extra: ParamMap): T = defaultCopy(extra)
}
{code}
was (Author: enrush):
I checked out the code and the design. Thanks for the great effort, but this doesn't quite meet the need. The transformer contract can still be met with a transform function that returns a Dataframe.
I would like something more like this.
{{@DeveloperApi}}
{{abstract class MultiColumnTransformer[T<: MultiColumnTransformer[T]]}}
{{ extends Transformer with HasOutputCol with Logging {}}
{{ def setOutputCol(value: String): T = set(outputCol, value).asInstanceOf[T]}}
{{ /** Returns the data type of the output column. */}}
{{ protected def outputDataType: DataType}}
{{ protected def *transformFunc*: Iterator[Row] => Iterator[Row]}}
{{ override def *transformSchema*(schema: StructType): StructType = {}}
{{ }}}
{{ def *transform*(dataset: DataFrame, targetSchema: StructType): DataFrame = {}}
{{ val targetEncoder = RowEncoder(targetSchema)}}
{{ dataset.mapPartitions(transformFunc)(targetEncoder)}}
{{ }}}
{{ override def *transform*(dataset: Dataset[_]): DataFrame = {}}
{{ val dataframe = dataset.toDF()}}
{{ val targetSchema = transformSchema(dataframe.schema, logging = true)}}
{{ transform(dataframe, targetSchema)}}
{{ }}}
{{ override def copy(extra: ParamMap): T = defaultCopy(extra)}}
{{}}}
> Developers API for Transformers beyond UnaryTransformer
> -------------------------------------------------------
>
> Key: SPARK-27249
> URL: https://issues.apache.org/jira/browse/SPARK-27249
> Project: Spark
> Issue Type: New Feature
> Components: ML
> Affects Versions: 3.1.0
> Reporter: Everett Rush
> Priority: Minor
> Labels: starter
> Attachments: Screen Shot 2020-01-17 at 4.20.57 PM.png
>
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> It would be nice to have a developers' API for dataset transformations that need more than one column from a row (ie UnaryTransformer inputs one column and outputs one column) or that contain objects too expensive to initialize repeatedly in a UDF such as a database connection.
>
> Design:
> Abstract class PartitionTransformer extends Transformer and defines the partition transformation function as Iterator[Row] => Iterator[Row]
> NB: This parallels the UnaryTransformer createTransformFunc method
>
> When developers subclass this transformer, they can provide their own schema for the output Row in which case the PartitionTransformer creates a row encoder and executes the transformation. Alternatively the developer can set output Datatype and output col name. Then the PartitionTransformer class will create a new schema, a row encoder, and execute the transformation.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org