You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiangrui Meng (JIRA)" <ji...@apache.org> on 2014/11/04 07:32:34 UTC

[jira] [Resolved] (SPARK-3573) Dataset

     [ https://issues.apache.org/jira/browse/SPARK-3573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xiangrui Meng resolved SPARK-3573.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.2.0

Issue resolved by pull request 3070
[https://github.com/apache/spark/pull/3070]

> Dataset
> -------
>
>                 Key: SPARK-3573
>                 URL: https://issues.apache.org/jira/browse/SPARK-3573
>             Project: Spark
>          Issue Type: Sub-task
>          Components: MLlib
>            Reporter: Xiangrui Meng
>            Assignee: Xiangrui Meng
>            Priority: Critical
>             Fix For: 1.2.0
>
>
> This JIRA is for discussion of ML dataset, essentially a SchemaRDD with extra ML-specific metadata embedded in its schema.
> .Sample code
> Suppose we have training events stored on HDFS and user/ad features in Hive, we want to assemble features for training and then apply decision tree.
> The proposed pipeline with dataset looks like the following (need more refinements):
> {code}
> sqlContext.jsonFile("/path/to/training/events", 0.01).registerTempTable("event")
> val training = sqlContext.sql("""
>   SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId, event.action AS label,
>          user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures,
>          ad.targetGender AS targetGender
>     FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;""").cache()
> val indexer = new Indexer()
> val interactor = new Interactor()
> val fvAssembler = new FeatureVectorAssembler()
> val treeClassifer = new DecisionTreeClassifer()
> val paramMap = new ParamMap()
>   .put(indexer.features, Map("userCountryIndex" -> "userCountry"))
>   .put(indexer.sortByFrequency, true)
>   .put(interactor.features, Map("genderMatch" -> Array("userGender", "targetGender")))
>   .put(fvAssembler.features, Map("features" -> Array("genderMatch", "userCountryIndex", "userFeatures")))
>   .put(fvAssembler.dense, true)
>   .put(treeClassifer.maxDepth, 4) // By default, classifier recognizes "features" and "label" columns.
> val pipeline = Pipeline.create(indexer, interactor, fvAssembler, treeClassifier)
> val model = pipeline.fit(training, paramMap)
> sqlContext.jsonFile("/path/to/events", 0.01).registerTempTable("event")
> val test = sqlContext.sql("""
>   SELECT event.id AS eventId, event.userId AS userId, event.adId AS adId,
>          user.gender AS userGender, user.country AS userCountry, user.features AS userFeatures,
>          ad.targetGender AS targetGender
>     FROM event JOIN user ON event.userId = user.id JOIN ad ON event.adId = ad.id;""")
> val prediction = model.transform(test).select('eventId, 'prediction)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org