You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arkadiusz Bicz <ar...@gmail.com> on 2016/01/13 15:39:59 UTC

How to make Dataset api as fast as DataFrame

Hi,

I have done some performance tests by repeating execution with
different number of  executors and memory for YARN  clustered Spark
(version 1.6.0)  ( cluster contains 6 large size nodes)

I found Dataset joinWith or cogroup from 3 to 5 times slower then
broadcast join in DataFrame, how to make it at least similar fast ?

Examples of my code :

DataFrame :
// 500 milion rows
val r = results.select("tradeId", "tradeVersion", "values").as("r")
// 100 thousand rows
val t = trades.select("tradeId", "tradeVersion", "agreement").distinct.as("t")

val j = r.join(broadcast(t), r("tradeId") === t("tradeId") &&
r("tradeVersion") === t("tradeVersion"))
val s = j.select(r("tradeId"), t("tradeVersion"), t("agreement"), r("values"))
val g = s.groupBy(t("agreement"))

val maxvec = new MaxVectorAggFunction
val agg = g.agg(maxvec(r("values")).as("maxvalues"))
agg.write.parquet("hdfs:.../tmp/somelocation")

DataSet

case class ResultsA(tradeId: String, tradeVersion: String, resultType:
Int, values: Array[Double])

case class TradesA(tradeId: String, tradeVersion: String, tradeType:
String, notional: BigDecimal, currency: String,
                      asset: String, trader: String, productCode:
String, counterParty: String, counterPartyAccronym: String,
                      tradeStatus: String, portfolio: String,
internalPortfolio: String, ptsBook: String, validFrom: String,
                      validTill: String, tradeDate: String, maturity:
String, buySellIndicator: String, agreement: String)

case class ResultSmallA(tradeId: String, tradeVersion: String, values:
Array[Double])
case class ResultAgreementA(tradeId: String, tradeVersion: String,
agreement: String, values: Array[Double])
case class TradeSmallA(tradeId: String, tradeVersion: String, agreement: String)

lazy val dsresults = results.as[ResultsA].map(r =>
ResultSmallA(r.tradeId, r.tradeVersion, r.values)).as("r")
lazy val dstrades = trades.as[TradesA].map(t => TradeSmallA(t.tradeId,
t.tradeVersion, t.agreement)).distinct.as("t")
lazy val j = dsresults.joinWith(dstrades, $"r.tradeId" ===
$"t.tradeId" && $"r.tradeVersion" === $"t.tradeVersion", "inner")

//1. MapGrouped

val group = j.groupBy { v => v match {
    case (r: ResultSmallA, t: TradeSmallA) => t
  }
}

val reduced = group.mapGroups { case (t, iter) => (t.tradeId,
t.tradeVersion, t.agreement,
  iter.map { case (r, t) => r.values }.reduce((l, r) => {
    val min = new MinVectorAggFunction(); min.mergeArrays(l, r)
  }))
}

//2. Reduce

val group2 = j.groupBy(_._2)

val reduced2 = group2.reduce((i1, i2) => {
  val r1 = i1._1
  val r2 = i2._1
  import r1._
  val min = new MinVectorAggFunction();
  (ResultSmallA(tradeId, tradeVersion, min.mergeArrays(values,
r2.values)), i1._2)
})

val reduced = reduced2.map { case (t, (r, _)) => (r.tradeId,
r.tradeVersion, t.agreement, r.values) }


//3. Cogroup

val cogrouped1 = dsresults.groupBy(r => (r.tradeId,
r.tradeVersion)).cogroup(dstrades.groupBy(t => (t.tradeId,
t.tradeVersion))) {
  case (key, data1, data2) =>
    if (data2.isEmpty || data1.isEmpty) Iterator()
    else {
      val t = data2.next()
      val min = new MinVectorAggFunction()
      Iterator((t.tradeId, t.tradeVersion, t.agreement,
data1.map(_.values).reduce(min.mergeArrays)))
    }
}

// MinVectorAggFunction just merge two array of Double

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to make Dataset api as fast as DataFrame

Posted by Arkadiusz Bicz <ar...@gmail.com>.
Hi,

Including query plan :
DataFrame :

== Physical Plan ==
SortBasedAggregate(key=[agreement#23],
functions=[(MaxVectorAggFunction(values#3),mode=Final,isDistinct=false)],
output=[agreement#23,maxvalues#27])
+- ConvertToSafe
   +- Sort [agreement#23 ASC], false, 0
      +- TungstenExchange hashpartitioning(agreement#23,48), None
         +- ConvertToUnsafe
            +- SortBasedAggregate(key=[agreement#23],
functions=[(MaxVectorAggFunction(values#3),mode=Partial,isDistinct=false)],
output=[agreement#23,values#26])
               +- ConvertToSafe
                  +- Sort [agreement#23 ASC], false, 0
                     +- Project [agreement#23,values#3]
                        +- BroadcastHashJoin
[tradeId#0,tradeVersion#1], [tradeId#4,tradeVersion#5], BuildRight
                           :- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,values#3]
                           +-
TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23],
functions=[], output=[tradeId#4,tradeVersion#5,agreement#23])
                              +- TungstenExchange
hashpartitioning(tradeId#4,tradeVersion#5,agreement#23,48), None
                                 +-
TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23],
functions=[], output=[tradeId#4,tradeVersion#5,agreement#23])
                                    +- Scan ParquetRelation[hdfs://



//1. MapGrouped
== Physical Plan ==
!MapGroups <function2>, class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string], class[_1[0]:
struct<tradeId:string,tradeVersion:string,values:array<double>>,
_2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>],
class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]:
array<double>], [tradeId#79,tradeVersion#80,agreement#81],
[_1#88,_2#89,_3#90,_4#91]
+- ConvertToSafe
   +- Sort [tradeId#79 ASC,tradeVersion#80 ASC,agreement#81 ASC], false, 0
      +- TungstenExchange
hashpartitioning(tradeId#79,tradeVersion#80,agreement#81,48), None
         +- ConvertToUnsafe
            +- !AppendColumns <function1>, class[_1[0]:
struct<tradeId:string,tradeVersion:string,values:array<double>>,
_2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>],
class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]:
string], [tradeId#79,tradeVersion#80,agreement#81]
               +- Project
[struct(tradeId#38,tradeVersion#39,values#40) AS
_1#73,struct(tradeId#67,tradeVersion#68,agreement#69) AS _2#74]
                  +- BroadcastHashJoin [tradeId#38,tradeVersion#39],
[tradeId#67,tradeVersion#68], BuildRight
                     :- ConvertToUnsafe
                     :  +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int,
values[0]: array<double>], class[tradeId[0]: string, tradeVersion[0]:
string, values[0]: array<double>],
[tradeId#38,tradeVersion#39,values#40]
                     :     +- ConvertToSafe
                     :        +- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3]
                     +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                        +- TungstenExchange
hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None
                           +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                              +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]:
string, notional[0]: decimal(38,18), currency[0]: string, asset[0]:
string, trader[0]: string, productCode[0]: string, counterParty[0]:
string, counterPartyAccronym[0]: string, tradeStatus[0]: string,
portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]:
string, validFrom[0]: string, validTill[0]: string, tradeDate[0]:
string, maturity[0]: string, buySellIndicator[0]: string,
agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string],
[tradeId#67,tradeVersion#68,agreement#69]
                                 +- ConvertToSafe
                                    +- Scan ParquetRelation[hdfs://na...


//2. Reduce

== Physical Plan ==
!MapPartitions <function1>, class[_1[0]:
struct<tradeId:string,tradeVersion:string,agreement:string>, _2[0]:
struct<_1:struct<tradeId:string,tradeVersion:string,values:array<double>>,_2:struct<tradeId:string,tradeVersion:string,agreement:string>>],
class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]:
array<double>], [_1#110,_2#111,_3#112,_4#113]
+- !MapGroups <function2>, class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string], class[_1[0]:
struct<tradeId:string,tradeVersion:string,values:array<double>>,
_2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>],
class[_1[0]: struct<tradeId:string,tradeVersion:string,agreement:string>,
_2[0]: struct<_1:struct<tradeId:string,tradeVersion:string,values:array<double>>,_2:struct<tradeId:string,tradeVersion:string,agreement:string>>],
[tradeId#98,tradeVersion#99,agreement#100], [_1#103,_2#104]
   +- ConvertToSafe
      +- Sort [tradeId#98 ASC,tradeVersion#99 ASC,agreement#100 ASC], false, 0
         +- TungstenExchange
hashpartitioning(tradeId#98,tradeVersion#99,agreement#100,48), None
            +- ConvertToUnsafe
               +- !AppendColumns <function1>, class[_1[0]:
struct<tradeId:string,tradeVersion:string,values:array<double>>,
_2[0]: struct<tradeId:string,tradeVersion:string,agreement:string>],
class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]:
string], [tradeId#98,tradeVersion#99,agreement#100]
                  +- Project
[struct(tradeId#38,tradeVersion#39,values#40) AS
_1#73,struct(tradeId#67,tradeVersion#68,agreement#69) AS _2#74]
                     +- BroadcastHashJoin
[tradeId#38,tradeVersion#39], [tradeId#67,tradeVersion#68], BuildRight
                        :- ConvertToUnsafe
                        :  +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int,
values[0]: array<double>], class[tradeId[0]: string, tradeVersion[0]:
string, values[0]: array<double>],
[tradeId#38,tradeVersion#39,values#40]
                        :     +- ConvertToSafe
                        :        +- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3]
                        +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                           +- TungstenExchange
hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None
                              +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                                 +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]:
string, notional[0]: decimal(38,18), currency[0]: string, asset[0]:
string, trader[0]: string, productCode[0]: string, counterParty[0]:
string, counterPartyAccronym[0]: string, tradeStatus[0]: string,
portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]:
string, validFrom[0]: string, validTill[0]: string, tradeDate[0]:
string, maturity[0]: string, buySellIndicator[0]: string,
agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string],
[tradeId#67,tradeVersion#68,agreement#69]
                                    +- ConvertToSafe
                                       +- Scan ParquetRelation[hdfs://n....


//3. Cogroup

== Physical Plan ==
!CoGroup <function3>, class[_1[0]: string, _2[0]: string],
class[tradeId[0]: string, tradeVersion[0]: string, values[0]:
array<double>], class[tradeId[0]: string, tradeVersion[0]: string,
agreement[0]: string], class[_1[0]: string, _2[0]: string, _3[0]:
string, _4[0]: array<double>], [_1#133,_2#134,_3#135,_4#136],
[_1#119,_2#120], [_1#125,_2#126]
:- ConvertToSafe
:  +- Sort [_1#119 ASC,_2#120 ASC], false, 0
:     +- TungstenExchange hashpartitioning(_1#119,_2#120,48), None
:        +- ConvertToUnsafe
:           +- !AppendColumns <function1>, class[tradeId[0]: string,
tradeVersion[0]: string, values[0]: array<double>], class[_1[0]:
string, _2[0]: string], [_1#119,_2#120]
:              +- ConvertToUnsafe
:                 +- !MapPartitions <function1>, class[tradeId[0]:
string, tradeVersion[0]: string, resultType[0]: int, values[0]:
array<double>], class[tradeId[0]: string, tradeVersion[0]: string,
values[0]: array<double>], [tradeId#38,tradeVersion#39,values#40]
:                    +- ConvertToSafe
:                       +- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00000-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00001-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00002-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-00003-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3]
+- ConvertToSafe
   +- Sort [_1#125 ASC,_2#126 ASC], false, 0
      +- TungstenExchange hashpartitioning(_1#125,_2#126,48), None
         +- ConvertToUnsafe
            +- !AppendColumns <function1>, class[tradeId[0]: string,
tradeVersion[0]: string, agreement[0]: string], class[_1[0]: string,
_2[0]: string], [_1#125,_2#126]
               +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                  +- TungstenExchange
hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None
                     +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
                        +- !MapPartitions <function1>,
class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]:
string, notional[0]: decimal(38,18), currency[0]: string, asset[0]:
string, trader[0]: string, productCode[0]: string, counterParty[0]:
string, counterPartyAccronym[0]: string, tradeStatus[0]: string,
portfolio[0]: string, internalPortfolio[0]: string, ptsBook[0]:
string, validFrom[0]: string, validTill[0]: string, tradeDate[0]:
string, maturity[0]: string, buySellIndicator[0]: string,
agreement[0]: string], class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string],
[tradeId#67,tradeVersion#68,agreement#69]
                           +- ConvertToSafe
                              +- Scan ParquetRelation[hdfs://na ...]
PushedFilter: []
[tradeId#4,tradeVersion#5,tradeType#6,notional#7L,currency#8,asset#9,trader#10,productCode#11,counterParty#12,counterPartyAccronym#13,tradeStatus#14,portfolio#15,internalPortfolio#16,ptsBook#17,validFrom#18,validTill#19,tradeDate#20,maturity#21,buySellIndicator#22,agreement#23,date#24]

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to make Dataset api as fast as DataFrame

Posted by Michael Armbrust <mi...@databricks.com>.
The focus of this release was to get the API out there and there's a lot of
low hanging performance optimizations.  That said, there is likely always
going to be some cost of materializing objects.

Another note, anytime your comparing performance its useful to include the
output of explain so we can sanity check the chosen query plan.

On Wed, Jan 13, 2016 at 6:39 AM, Arkadiusz Bicz <ar...@gmail.com>
wrote:

> Hi,
>
> I have done some performance tests by repeating execution with
> different number of  executors and memory for YARN  clustered Spark
> (version 1.6.0)  ( cluster contains 6 large size nodes)
>
> I found Dataset joinWith or cogroup from 3 to 5 times slower then
> broadcast join in DataFrame, how to make it at least similar fast ?
>
> Examples of my code :
>
> DataFrame :
> // 500 milion rows
> val r = results.select("tradeId", "tradeVersion", "values").as("r")
> // 100 thousand rows
> val t = trades.select("tradeId", "tradeVersion", "agreement").distinct.as
> ("t")
>
> val j = r.join(broadcast(t), r("tradeId") === t("tradeId") &&
> r("tradeVersion") === t("tradeVersion"))
> val s = j.select(r("tradeId"), t("tradeVersion"), t("agreement"),
> r("values"))
> val g = s.groupBy(t("agreement"))
>
> val maxvec = new MaxVectorAggFunction
> val agg = g.agg(maxvec(r("values")).as("maxvalues"))
> agg.write.parquet("hdfs:.../tmp/somelocation")
>
> DataSet
>
> case class ResultsA(tradeId: String, tradeVersion: String, resultType:
> Int, values: Array[Double])
>
> case class TradesA(tradeId: String, tradeVersion: String, tradeType:
> String, notional: BigDecimal, currency: String,
>                       asset: String, trader: String, productCode:
> String, counterParty: String, counterPartyAccronym: String,
>                       tradeStatus: String, portfolio: String,
> internalPortfolio: String, ptsBook: String, validFrom: String,
>                       validTill: String, tradeDate: String, maturity:
> String, buySellIndicator: String, agreement: String)
>
> case class ResultSmallA(tradeId: String, tradeVersion: String, values:
> Array[Double])
> case class ResultAgreementA(tradeId: String, tradeVersion: String,
> agreement: String, values: Array[Double])
> case class TradeSmallA(tradeId: String, tradeVersion: String, agreement:
> String)
>
> lazy val dsresults = results.as[ResultsA].map(r =>
> ResultSmallA(r.tradeId, r.tradeVersion, r.values)).as("r")
> lazy val dstrades = trades.as[TradesA].map(t => TradeSmallA(t.tradeId,
> t.tradeVersion, t.agreement)).distinct.as("t")
> lazy val j = dsresults.joinWith(dstrades, $"r.tradeId" ===
> $"t.tradeId" && $"r.tradeVersion" === $"t.tradeVersion", "inner")
>
> //1. MapGrouped
>
> val group = j.groupBy { v => v match {
>     case (r: ResultSmallA, t: TradeSmallA) => t
>   }
> }
>
> val reduced = group.mapGroups { case (t, iter) => (t.tradeId,
> t.tradeVersion, t.agreement,
>   iter.map { case (r, t) => r.values }.reduce((l, r) => {
>     val min = new MinVectorAggFunction(); min.mergeArrays(l, r)
>   }))
> }
>
> //2. Reduce
>
> val group2 = j.groupBy(_._2)
>
> val reduced2 = group2.reduce((i1, i2) => {
>   val r1 = i1._1
>   val r2 = i2._1
>   import r1._
>   val min = new MinVectorAggFunction();
>   (ResultSmallA(tradeId, tradeVersion, min.mergeArrays(values,
> r2.values)), i1._2)
> })
>
> val reduced = reduced2.map { case (t, (r, _)) => (r.tradeId,
> r.tradeVersion, t.agreement, r.values) }
>
>
> //3. Cogroup
>
> val cogrouped1 = dsresults.groupBy(r => (r.tradeId,
> r.tradeVersion)).cogroup(dstrades.groupBy(t => (t.tradeId,
> t.tradeVersion))) {
>   case (key, data1, data2) =>
>     if (data2.isEmpty || data1.isEmpty) Iterator()
>     else {
>       val t = data2.next()
>       val min = new MinVectorAggFunction()
>       Iterator((t.tradeId, t.tradeVersion, t.agreement,
> data1.map(_.values).reduce(min.mergeArrays)))
>     }
> }
>
> // MinVectorAggFunction just merge two array of Double
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>