You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by al...@apache.org on 2020/07/11 14:54:24 UTC

[incubator-spot] branch master updated: (SPOT-286) Do not call org.apache.spark.sql.functions.broadcast() on large sets

This is an automated email from the ASF dual-hosted git repository.

alan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-spot.git


The following commit(s) were added to refs/heads/master by this push:
     new 846cf94  (SPOT-286) Do not call org.apache.spark.sql.functions.broadcast() on large sets
     new 2d60a2a  Merge pull request #164 from jfn6030217/spot-286-remove-optimization
846cf94 is described below

commit 846cf94ade0cf696e3a2ca10044dfc92bc5e2039
Author: Jeremy Nelson <je...@digitalminion.com>
AuthorDate: Wed Jun 3 14:44:14 2020 -0500

    (SPOT-286) Do not call org.apache.spark.sql.functions.broadcast() on large sets
    
    https://issues.apache.org/jira/projects/SPOT/issues/SPOT-286
    identifies an issue with calling org.apache.spark.sql.functions.broadcast()
    on large datasets which may exceed a limit and cause an exception.
    
    It was determined wrapping a dataset in this way is a performance
    optimization, and the justification for this optimization is not clear.
    
    It was decided it was preferable to undo the optimization than allow
    it to fail for some situations.
---
 .../org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala      | 4 ++--
 .../org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala | 6 +++---
 .../scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala  | 4 ++--
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index 7245acf..92734d0 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -100,7 +100,7 @@ class DNSSuspiciousConnectsModel(inTopicCount: Int,
         documentTopicMix))
 
     inDF
-      .join(org.apache.spark.sql.functions.broadcast(ipToTopicMix), inDF(ClientIP) === ipToTopicMix(DocumentName),
+      .join(ipToTopicMix, inDF(ClientIP) === ipToTopicMix(DocumentName),
         "left_outer")
       .selectExpr(inDF.schema.fieldNames :+ TopicProbabilityMix: _*)
       .withColumn(Score, scoringUDF(DNSSuspiciousConnectsModel.modelColumns :+ col(TopicProbabilityMix): _*))
@@ -213,4 +213,4 @@ object DNSSuspiciousConnectsModel {
 
   case class TempFields(topDomainClass: Int, subdomainLength: Integer, subdomainEntropy: Double, numPeriods: Integer)
 
-}
\ No newline at end of file
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index 4e09616..6637c87 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -64,13 +64,13 @@ class FlowSuspiciousConnectsModel(topicCount: Int,
       */
     val dataWithSrcTopicMix = {
 
-      val recordsWithSrcIPTopicMixes = flowRecords.join(org.apache.spark.sql.functions.broadcast(ipToTopicMix),
+      val recordsWithSrcIPTopicMixes = flowRecords.join(ipToTopicMix,
         flowRecords(SourceIP) === ipToTopicMix(DocumentName), "left_outer")
       val schemaWithSrcTopicMix = flowRecords.schema.fieldNames :+ TopicProbabilityMix
       val dataWithSrcIpProb: DataFrame = recordsWithSrcIPTopicMixes.selectExpr(schemaWithSrcTopicMix: _*)
         .withColumnRenamed(TopicProbabilityMix, SrcIpTopicMix)
 
-      val recordsWithIPTopicMixes = dataWithSrcIpProb.join(org.apache.spark.sql.functions.broadcast(ipToTopicMix),
+      val recordsWithIPTopicMixes = dataWithSrcIpProb.join(ipToTopicMix,
         dataWithSrcIpProb(DestinationIP) === ipToTopicMix(DocumentName), "left_outer")
       val schema = dataWithSrcIpProb.schema.fieldNames :+ TopicProbabilityMix
       recordsWithIPTopicMixes.selectExpr(schema: _*).withColumnRenamed(TopicProbabilityMix, DstIpTopicMix)
@@ -186,4 +186,4 @@ object FlowSuspiciousConnectsModel {
 
     new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
   }
-}
\ No newline at end of file
+}
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index 7332fe4..d260ca0 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -79,7 +79,7 @@ class ProxySuspiciousConnectsModel(topicCount: Int,
       scoreFunction.score(precisionUtility)(documentTopicMix, word))
 
     wordedDataFrame
-      .join(org.apache.spark.sql.functions.broadcast(ipToTopicMIx), dataFrame(ClientIP) === ipToTopicMIx(DocumentName), "left_outer")
+      .join(ipToTopicMIx, dataFrame(ClientIP) === ipToTopicMIx(DocumentName), "left_outer")
       .selectExpr(wordedDataFrame.schema.fieldNames :+ TopicProbabilityMix: _*)
       .withColumn(Score, udfScoreFunction(col(TopicProbabilityMix), col(Word)))
       .drop(TopicProbabilityMix)
@@ -213,4 +213,4 @@ object ProxySuspiciousConnectsModel {
       .map({ case Row(ip, word) => ((ip.asInstanceOf[String], word.asInstanceOf[String]), 1) })
       .reduceByKey(_ + _).map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
   }
-}
\ No newline at end of file
+}