You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by al...@apache.org on 2020/07/11 14:54:24 UTC
[incubator-spot] branch master updated: (SPOT-286) Do not call
org.apache.spark.sql.functions.broadcast() on large sets
This is an automated email from the ASF dual-hosted git repository.
alan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-spot.git
The following commit(s) were added to refs/heads/master by this push:
new 846cf94 (SPOT-286) Do not call org.apache.spark.sql.functions.broadcast() on large sets
new 2d60a2a Merge pull request #164 from jfn6030217/spot-286-remove-optimization
846cf94 is described below
commit 846cf94ade0cf696e3a2ca10044dfc92bc5e2039
Author: Jeremy Nelson <je...@digitalminion.com>
AuthorDate: Wed Jun 3 14:44:14 2020 -0500
(SPOT-286) Do not call org.apache.spark.sql.functions.broadcast() on large sets
https://issues.apache.org/jira/projects/SPOT/issues/SPOT-286
identifies an issue with calling org.apache.spark.sql.functions.broadcast()
on large datasets which may exceed a limit and cause an exception.
It was determined wrapping a dataset in this way is a performance
optimization, and the justification for this optimization is not clear.
It was decided it was preferable to undo the optimization than allow
it to fail for some situations.
---
.../org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala | 4 ++--
.../org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala | 6 +++---
.../scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala | 4 ++--
3 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index 7245acf..92734d0 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -100,7 +100,7 @@ class DNSSuspiciousConnectsModel(inTopicCount: Int,
documentTopicMix))
inDF
- .join(org.apache.spark.sql.functions.broadcast(ipToTopicMix), inDF(ClientIP) === ipToTopicMix(DocumentName),
+ .join(ipToTopicMix, inDF(ClientIP) === ipToTopicMix(DocumentName),
"left_outer")
.selectExpr(inDF.schema.fieldNames :+ TopicProbabilityMix: _*)
.withColumn(Score, scoringUDF(DNSSuspiciousConnectsModel.modelColumns :+ col(TopicProbabilityMix): _*))
@@ -213,4 +213,4 @@ object DNSSuspiciousConnectsModel {
case class TempFields(topDomainClass: Int, subdomainLength: Integer, subdomainEntropy: Double, numPeriods: Integer)
-}
\ No newline at end of file
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index 4e09616..6637c87 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -64,13 +64,13 @@ class FlowSuspiciousConnectsModel(topicCount: Int,
*/
val dataWithSrcTopicMix = {
- val recordsWithSrcIPTopicMixes = flowRecords.join(org.apache.spark.sql.functions.broadcast(ipToTopicMix),
+ val recordsWithSrcIPTopicMixes = flowRecords.join(ipToTopicMix,
flowRecords(SourceIP) === ipToTopicMix(DocumentName), "left_outer")
val schemaWithSrcTopicMix = flowRecords.schema.fieldNames :+ TopicProbabilityMix
val dataWithSrcIpProb: DataFrame = recordsWithSrcIPTopicMixes.selectExpr(schemaWithSrcTopicMix: _*)
.withColumnRenamed(TopicProbabilityMix, SrcIpTopicMix)
- val recordsWithIPTopicMixes = dataWithSrcIpProb.join(org.apache.spark.sql.functions.broadcast(ipToTopicMix),
+ val recordsWithIPTopicMixes = dataWithSrcIpProb.join(ipToTopicMix,
dataWithSrcIpProb(DestinationIP) === ipToTopicMix(DocumentName), "left_outer")
val schema = dataWithSrcIpProb.schema.fieldNames :+ TopicProbabilityMix
recordsWithIPTopicMixes.selectExpr(schema: _*).withColumnRenamed(TopicProbabilityMix, DstIpTopicMix)
@@ -186,4 +186,4 @@ object FlowSuspiciousConnectsModel {
new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
}
-}
\ No newline at end of file
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index 7332fe4..d260ca0 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -79,7 +79,7 @@ class ProxySuspiciousConnectsModel(topicCount: Int,
scoreFunction.score(precisionUtility)(documentTopicMix, word))
wordedDataFrame
- .join(org.apache.spark.sql.functions.broadcast(ipToTopicMIx), dataFrame(ClientIP) === ipToTopicMIx(DocumentName), "left_outer")
+ .join(ipToTopicMIx, dataFrame(ClientIP) === ipToTopicMIx(DocumentName), "left_outer")
.selectExpr(wordedDataFrame.schema.fieldNames :+ TopicProbabilityMix: _*)
.withColumn(Score, udfScoreFunction(col(TopicProbabilityMix), col(Word)))
.drop(TopicProbabilityMix)
@@ -213,4 +213,4 @@ object ProxySuspiciousConnectsModel {
.map({ case Row(ip, word) => ((ip.asInstanceOf[String], word.asInstanceOf[String]), 1) })
.reduceByKey(_ + _).map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
}
-}
\ No newline at end of file
+}