You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lu...@sina.com on 2016/06/30 13:08:58 UTC

how to add a column according to an existing column of a dataframe?

hi guys,     I have a dataframe with 3 columns, id(int) ,type(string) ,price(string) , and I want to add a column "price range", according to the value of price.      I checked the SPARK-15383, however in my code I just want to append a column, which is transforming from the original dataframe "resultprice",  to resultprice     Is there a better way to do this? No matter the implements or code efficiency.     Will pattern matching help ? and How?     Thank you guys.
here is my code:
    val priceRange = resultprice.select("price").map { x =>
      if (x.getString(0).trim == "null"||x.getString(0).trim == "") x(0).toString().trim.+("|0") else
      if (x.getString(0).trim.contains('万')) x(0).toString().trim.replaceAll("万", "0000").+("|22") else
      if (x.getString(0).trim.toInt < 5000) x(0).toString().+("|1") else
      if (x.getString(0).trim.toInt >= 5000  && x.getString(0).trim.toInt < 10000) x(0).toString().trim+("|2") else
      if (x.getString(0).trim.toInt >= 10000 && x.getString(0).trim.toInt < 15000) x(0).toString().trim.+("|3") else
      if (x.getString(0).trim.toInt >= 15000 && x.getString(0).trim.toInt < 20000) x(0).toString().trim.+("|4") else
      if (x.getString(0).trim.toInt >= 20000 && x.getString(0).trim.toInt < 25000) x(0).toString().trim.+("|5") else
      if (x.getString(0).trim.toInt >= 25000 && x.getString(0).trim.toInt < 30000) x(0).toString().trim.+("|6") else
      if (x.getString(0).trim.toInt >= 30000 && x.getString(0).trim.toInt < 35000) x(0).toString().trim.+("|7") else
      if (x.getString(0).trim.toInt >= 35000 && x.getString(0).trim.toInt < 40000) x(0).toString().trim.+("|8") else
      if (x.getString(0).trim.toInt >= 40000 && x.getString(0).trim.toInt < 45000) x(0).toString().trim.+("|9") else
      if (x.getString(0).trim.toInt >= 45000 && x.getString(0).trim.toInt < 50000) x(0).toString().trim.+("|10") else
      if (x.getString(0).trim.toInt >= 50000 && x.getString(0).trim.toInt < 55000) x(0).toString().trim.+("|11") else
      if (x.getString(0).trim.toInt >= 55000 && x.getString(0).trim.toInt < 60000) x(0).toString().trim.+("|12") else
      if (x.getString(0).trim.toInt >= 60000 && x.getString(0).trim.toInt < 65000) x(0).toString().trim.+("|13") else
      if (x.getString(0).trim.toInt >= 65000 && x.getString(0).trim.toInt < 70000) x(0).toString().trim.+("|14") else
      if (x.getString(0).trim.toInt >= 70000 && x.getString(0).trim.toInt < 75000) x(0).toString().trim.+("|15") else
      if (x.getString(0).trim.toInt >= 75000 && x.getString(0).trim.toInt < 80000) x(0).toString().trim.+("|16") else
      if (x.getString(0).trim.toInt >= 80000 && x.getString(0).trim.toInt < 85000) x(0).toString().trim.+("|17") else
      if (x.getString(0).trim.toInt >= 85000 && x.getString(0).trim.toInt < 90000) x(0).toString().trim.+("|18") else
      if (x.getString(0).trim.toInt >= 90000 && x.getString(0).trim.toInt < 95000) x(0).toString().trim.+("|19") else
      if (x.getString(0).trim.toInt >= 95000 && x.getString(0).trim.toInt < 100000) x(0).toString().trim.+("|20") else
      if (x.getString(0).trim.toInt >= 100000) x(0).toString().trim.+("|21")
    }    priceRange.collect().foreach(println)
    case class PriceRange(price:String,priceRange:Int)
    val priceRange2 = priceRange.map(_.toString().split("|")).map { p => PriceRange(p(0), p(1).trim.toInt)}.toDF()
    val priceRangeCol = priceRange2.apply("priceRange")
    val finalPrice = resultprice.withColumn("priceRange", priceRangeCol)
here is the stacktrace:scala>     val finalPrice = resultprice.withColumn("priceRange", priceRangeCol)
org.apache.spark.sql.AnalysisException: resolved attribute(s) priceRange#2629 missing from lp_loupan_id#1,price_type#26,price#101 in operator !Project [lp_loupan_id#1,price_type#26,price#101,priceRange#2629 AS priceRange#2630];
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:183)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126)
        at org.apache.spark.sql.DataFrame.select(DataFrame.scala:707)
        at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1188)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
        at $iwC$$iwC$$iwC.<init>(<console>:67)
        at $iwC$$iwC.<init>(<console>:69)
        at $iwC.<init>(<console>:71)
        at <init>(<console>:73)
        at .<init>(<console>:77)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


--------------------------------

 

Thanks&amp;Best regards!
San.Luo

Re: how to add a column according to an existing column of a dataframe?

Posted by nguyen duc tuan <ne...@gmail.com>.
About spark issue that you refer to, it's is not related to your problem :D
In this case, you only have to to is using withColumn function. For example:
import org.apache.spark.sql.functions._
val getRange = udf((x: Int) => get price range code ...)
val priceRange = resultPrice.withColumn("range", getRange($"price"))

About code efficiency, this function can be as simply as:
val getRange = udf((x: String) =>
  if (x == null || x.length == 0) 0
else if (x.contains('万')) 22
else {
  val intVal = x.toInt
  if (intVal > 20000) 21
else (x / 5000) + 1
}
)

In general, you can store all the boundary values of the ranges in an array
(in order) then loop through all the values (or more efficency using binary
search) to find what range that value belongs to.

2016-06-30 20:08 GMT+07:00 <lu...@sina.com>:

> hi guys,
>      I have a dataframe with 3 columns, id(int) ,type(string)
> ,price(string) , and I want to add a column "price range", according to the
> value of price.
>      I checked the SPARK-15383
> <https://issues.apache.org/jira/browse/SPARK-15383>, however in my code I
> just want to append a column, which is transforming from the original
> dataframe "resultprice",  to resultprice
>      Is there a better way to do this? No matter the implements or code
> efficiency.
>      Will pattern matching help ? and How?
>      Thank you guys.
>
> here is my code:
>
>     val priceRange = resultprice.select("price").map { x =>
>       if (x.getString(0).trim == "null"||x.getString(0).trim == "")
> x(0).toString().trim.+("|0") else
>       if (x.getString(0).trim.contains('万'))
> x(0).toString().trim.replaceAll("万", "0000").+("|22") else
>       if (x.getString(0).trim.toInt < 5000) x(0).toString().+("|1") else
>       if (x.getString(0).trim.toInt >= 5000  && x.getString(0).trim.toInt
> < 10000) x(0).toString().trim+("|2") else
>       if (x.getString(0).trim.toInt >= 10000 && x.getString(0).trim.toInt
> < 15000) x(0).toString().trim.+("|3") else
>       if (x.getString(0).trim.toInt >= 15000 && x.getString(0).trim.toInt
> < 20000) x(0).toString().trim.+("|4") else
>       if (x.getString(0).trim.toInt >= 20000 && x.getString(0).trim.toInt
> < 25000) x(0).toString().trim.+("|5") else
>       if (x.getString(0).trim.toInt >= 25000 && x.getString(0).trim.toInt
> < 30000) x(0).toString().trim.+("|6") else
>       if (x.getString(0).trim.toInt >= 30000 && x.getString(0).trim.toInt
> < 35000) x(0).toString().trim.+("|7") else
>       if (x.getString(0).trim.toInt >= 35000 && x.getString(0).trim.toInt
> < 40000) x(0).toString().trim.+("|8") else
>       if (x.getString(0).trim.toInt >= 40000 && x.getString(0).trim.toInt
> < 45000) x(0).toString().trim.+("|9") else
>       if (x.getString(0).trim.toInt >= 45000 && x.getString(0).trim.toInt
> < 50000) x(0).toString().trim.+("|10") else
>       if (x.getString(0).trim.toInt >= 50000 && x.getString(0).trim.toInt
> < 55000) x(0).toString().trim.+("|11") else
>       if (x.getString(0).trim.toInt >= 55000 && x.getString(0).trim.toInt
> < 60000) x(0).toString().trim.+("|12") else
>       if (x.getString(0).trim.toInt >= 60000 && x.getString(0).trim.toInt
> < 65000) x(0).toString().trim.+("|13") else
>       if (x.getString(0).trim.toInt >= 65000 && x.getString(0).trim.toInt
> < 70000) x(0).toString().trim.+("|14") else
>       if (x.getString(0).trim.toInt >= 70000 && x.getString(0).trim.toInt
> < 75000) x(0).toString().trim.+("|15") else
>       if (x.getString(0).trim.toInt >= 75000 && x.getString(0).trim.toInt
> < 80000) x(0).toString().trim.+("|16") else
>       if (x.getString(0).trim.toInt >= 80000 && x.getString(0).trim.toInt
> < 85000) x(0).toString().trim.+("|17") else
>       if (x.getString(0).trim.toInt >= 85000 && x.getString(0).trim.toInt
> < 90000) x(0).toString().trim.+("|18") else
>       if (x.getString(0).trim.toInt >= 90000 && x.getString(0).trim.toInt
> < 95000) x(0).toString().trim.+("|19") else
>       if (x.getString(0).trim.toInt >= 95000 && x.getString(0).trim.toInt
> < 100000) x(0).toString().trim.+("|20") else
>       if (x.getString(0).trim.toInt >= 100000)
> x(0).toString().trim.+("|21")
>     }
>     priceRange.collect().foreach(println)
>     case class PriceRange(price:String,priceRange:Int)
>     val priceRange2 = priceRange.map(_.toString().split("|")).map { p =>
> PriceRange(p(0), p(1).trim.toInt)}.toDF()
>     val priceRangeCol = priceRange2.apply("priceRange")
>     val finalPrice = resultprice.withColumn("priceRange", priceRangeCol)
>
> here is the stacktrace:
> scala>     val finalPrice = resultprice.withColumn("priceRange",
> priceRangeCol)
> org.apache.spark.sql.AnalysisException: resolved attribute(s)
> priceRange#2629 missing from lp_loupan_id#1,price_type#26,price#101 in
> operator !Project [lp_loupan_id#1,price_type#26,price#101,priceRange#2629
> AS priceRange#2630];
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
>         at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:183)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
>         at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
>         at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
>         at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
>         at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
>         at org.apache.spark.sql.DataFrame.org
> $apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126)
>         at org.apache.spark.sql.DataFrame.select(DataFrame.scala:707)
>         at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1188)
>         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
>         at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
>         at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
>         at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
>         at $iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
>         at $iwC$$iwC$$iwC.<init>(<console>:67)
>         at $iwC$$iwC.<init>(<console>:69)
>         at $iwC.<init>(<console>:71)
>         at <init>(<console>:73)
>         at .<init>(<console>:77)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>         at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
>         at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
>         at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
>         at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
>         at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
>         at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
>         at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
>         at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
>         at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>         at
> org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
>         at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at org.apache.spark.repl.SparkILoop.org
> $apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
>         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
>         at org.apache.spark.repl.Main$.main(Main.scala:31)
>         at org.apache.spark.repl.Main.main(Main.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> --------------------------------
>
> Thanks&amp;Best regards!
> San.Luo
>