You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 19:28:15 UTC
[06/42] incubator-spot git commit: Spot-196: Changes: - TopDomains -
small refactoring,
deleted unused variable. - Decoupled SpotLDAWrapper into different components
for better modularity. - Modified Flow,
DNS and Proxy SuspiciousConnectsModel objects to
Spot-196: Changes:
- TopDomains - small refactoring, deleted unused variable.
- Decoupled SpotLDAWrapper into different components for better modularity.
- Modified Flow, DNS and Proxy SuspiciousConnectsModel objects to implement changes on SpotLDAWrapper
- Updated unit tests and implemented new ones to cover changes on SpotLDAWrapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/45c03ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/45c03ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/45c03ab6
Branch: refs/heads/SPOT-181_ODM
Commit: 45c03ab6ab97ffbd2a1228fb5dbd071511ce3c7b
Parents: 2ebe572
Author: Ricardo Barona <ri...@intel.com>
Authored: Thu Aug 3 14:02:13 2017 -0500
Committer: Ricardo Barona <ri...@intel.com>
Committed: Fri Oct 6 15:25:58 2017 -0500
----------------------------------------------------------------------
.../dns/model/DNSSuspiciousConnectsModel.scala | 43 ++--
.../org/apache/spot/lda/SpotLDAHelper.scala | 172 ++++++++++++++
.../org/apache/spot/lda/SpotLDAModel.scala | 140 +++++++++++
.../org/apache/spot/lda/SpotLDAResult.scala | 43 ++++
.../org/apache/spot/lda/SpotLDAWrapper.scala | 226 +++---------------
.../model/FlowSuspiciousConnectsModel.scala | 27 +--
.../proxy/ProxySuspiciousConnectsModel.scala | 25 +-
.../org/apache/spot/utilities/TopDomains.scala | 1 -
.../org/apache/spot/lda/SpotLDAHelperTest.scala | 133 +++++++++++
.../apache/spot/lda/SpotLDAWrapperTest.scala | 238 ++++++-------------
10 files changed, 636 insertions(+), 412 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index dfcb543..7245acf 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -19,22 +19,18 @@ package org.apache.spot.dns.model
import org.apache.log4j.Logger
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.dns.DNSSchema._
import org.apache.spot.dns.DNSWordCreation
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper._
import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda._
import org.apache.spot.utilities.DomainProcessor.DomainInfo
import org.apache.spot.utilities._
import org.apache.spot.utilities.data.validation.InvalidDataHandler
-import scala.util.{Failure, Success, Try}
-
/**
* A probabilistic model of the DNS queries issued by each client IP.
@@ -50,17 +46,17 @@ import scala.util.{Failure, Success, Try}
*
* Create these models using the factory in the companion object.
*
- * @param inTopicCount Number of topics to use in the topic model.
- * @param inIpToTopicMix Per-IP topic mix.
- * @param inWordToPerTopicProb Per-word, an array of probability of word given topic per topic.
+ * @param inTopicCount Number of topics to use in the topic model.
+ * @param inIpToTopicMix Per-IP topic mix.
+ * @param inWordToPerTopicProb Per-word, an array of probability of word given topic per topic.
*/
class DNSSuspiciousConnectsModel(inTopicCount: Int,
inIpToTopicMix: DataFrame,
inWordToPerTopicProb: Map[String, Array[Double]]) {
- val topicCount = inTopicCount
- val ipToTopicMix = inIpToTopicMix
- val wordToPerTopicProb = inWordToPerTopicProb
+ val topicCount: Int = inTopicCount
+ val ipToTopicMix: DataFrame = inIpToTopicMix
+ val wordToPerTopicProb: Map[String, Array[Double]] = inWordToPerTopicProb
/**
* Use a suspicious connects model to assign estimated probabilities to a dataframe of
@@ -128,7 +124,7 @@ object DNSSuspiciousConnectsModel {
QueryTypeField,
QueryResponseCodeField))
- val modelColumns = ModelSchema.fieldNames.toList.map(col)
+ val modelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
val DomainStatsSchema = StructType(List(TopDomainField, SubdomainLengthField, SubdomainEntropyField, NumPeriodsField))
@@ -136,7 +132,7 @@ object DNSSuspiciousConnectsModel {
* Create a new DNS Suspicious Connects model by training it on a data frame and a feedback file.
*
* @param sparkSession Spark Session
- * @param logger
+ * @param logger Application logger
* @param config Analysis configuration object containing CLI parameters.
* Contains the path to the feedback file in config.scoresFile
* @param inputRecords Data used to train the model.
@@ -155,7 +151,6 @@ object DNSSuspiciousConnectsModel {
config.feedbackFile,
config.duplicationFactor))
- val countryCodesBC = sparkSession.sparkContext.broadcast(CountryCodes.CountryCodes)
val topDomainsBC = sparkSession.sparkContext.broadcast(TopDomains.TopDomains)
val userDomain = config.userDomain
@@ -175,19 +170,20 @@ object DNSSuspiciousConnectsModel {
.reduceByKey(_ + _)
.map({ case ((ipDst, word), count) => SpotLDAInput(ipDst, word, count) })
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipDstWordCounts, config.precisionUtility, sparkSession)
- val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkSession,
- ipDstWordCounts,
- config.topicCount,
+ val model: SpotLDAModel = SpotLDAWrapper.run(config.topicCount,
logger,
config.ldaPRGSeed,
config.ldaAlpha,
config.ldaBeta,
config.ldaOptimizer,
config.ldaMaxiterations,
- config.precisionUtility)
+ spotLDAHelper)
+
+ val results: SpotLDAResult = model.predict(spotLDAHelper)
- new DNSSuspiciousConnectsModel(config.topicCount, ipToTopicMix, wordToPerTopicProb)
+ new DNSSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
}
@@ -205,15 +201,16 @@ object DNSSuspiciousConnectsModel {
userDomain: String,
url: String): TempFields = {
- val DomainInfo(_, topDomainClass, subdomain, subdomainLength, subdomainEntropy, numPeriods) =
+ val DomainInfo(_, topDomainClass, _, subDomainLength, subDomainEntropy, numPeriods) =
DomainProcessor.extractDomainInfo(url, topDomainsBC, userDomain)
TempFields(topDomainClass = topDomainClass,
- subdomainLength = subdomainLength,
- subdomainEntropy = subdomainEntropy,
+ subdomainLength = subDomainLength,
+ subdomainEntropy = subDomainEntropy,
numPeriods = numPeriods)
}
case class TempFields(topDomainClass: Int, subdomainLength: Integer, subdomainEntropy: Double, numPeriods: Integer)
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
new file mode 100644
index 0000000..8e771cb
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.utilities.{FloatPointPrecisionUtility, FloatPointPrecisionUtility64}
+
+import scala.collection.immutable.Map
+
+/**
+ * Apache Spot routines to format Spark LDA input and output for scoring.
+ */
+class SpotLDAHelper(private final val sparkSession: SparkSession,
+ final val docWordCount: RDD[SpotLDAInput],
+ final val documentDictionary: DataFrame,
+ final val wordDictionary: Map[String, Int],
+ final val precisionUtility: FloatPointPrecisionUtility = FloatPointPrecisionUtility64) extends Serializable {
+
+ /**
+ * Format document word count as RDD[(Long, Vector)] - input data for LDA algorithm
+ *
+ * @return RDD[(Long, Vector)]
+ */
+ val formattedCorpus: RDD[(Long, Vector)] = {
+ import sparkSession.implicits._
+
+ val getWordId = {
+ udf((word: String) => wordDictionary(word))
+ }
+
+ val docWordCountDF = docWordCount
+ .map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
+ .toDF(DocumentName, WordName, WordNameWordCount)
+
+ // Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
+ // is indexed by DocID
+ val wordCountsPerDocDF = docWordCountDF
+ .join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
+ .drop(documentDictionary(DocumentName))
+ .withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
+ .drop(WordName)
+
+ val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
+ = wordCountsPerDocDF
+ .select(DocumentNumber, WordNumber, WordNameWordCount)
+ .rdd
+ .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
+ .groupByKey
+
+ // Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
+ val numUniqueWords = wordDictionary.size
+ val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
+ .mapValues(vs => Vectors.sparse(numUniqueWords, vs.toSeq))
+
+ ldaInput
+ }
+
+ /**
+ * Format LDA output topicDistribution for spot-ml scoring
+ *
+ * @param documentDistributions LDA model topicDistributions
+ * @return DataFrame
+ */
+ def formatDocumentDistribution(documentDistributions: RDD[(Long, Vector)]): DataFrame = {
+ import sparkSession.implicits._
+
+ val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
+ val documentToTopicDistributionDF = documentDistributions.toDF(DocumentNumber, TopicProbabilityMix)
+
+ val documentToTopicDistributionArray = documentToTopicDistributionDF
+ .join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
+ .drop(documentDictionary(DocumentNumber))
+ .drop(documentToTopicDistributionDF(DocumentNumber))
+ .select(DocumentName, TopicProbabilityMix)
+ .withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
+ .selectExpr(s"$DocumentName AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
+
+ precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
+ }
+
+ /**
+ * Format LDA output topicMatrix for spot-ml scoring
+ *
+ * @param topicsMatrix LDA model topicMatrix
+ * @return Map[String, Array[Double]]
+ **/
+ def formatTopicDistributions(topicsMatrix: Matrix): Map[String, Array[Double]] = {
+ // Incoming word top matrix is in column-major order and the columns are unnormalized
+ val m = topicsMatrix.numRows
+ val n = topicsMatrix.numCols
+ val reverseWordDictionary = wordDictionary.map(_.swap)
+
+ val columnSums: Array[Double] = Range(0, n).map(j => Range(0, m).map(i => topicsMatrix(i, j)).sum).toArray
+
+ val wordProbabilities: Seq[Array[Double]] = topicsMatrix.transpose.toArray.grouped(n).toSeq
+ .map(unNormalizedProbabilities => unNormalizedProbabilities.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
+
+ wordProbabilities.zipWithIndex
+ .map({ case (topicProbabilities, wordInd) => (reverseWordDictionary(wordInd), topicProbabilities) }).toMap
+ }
+
+}
+
+object SpotLDAHelper {
+
+ /**
+ * Factory method for SpotLDAHelper new instance.
+ *
+ * @param docWordCount Document word count.
+ * @param precisionUtility
+ * @param sparkSession
+ * @return
+ */
+ def apply(docWordCount: RDD[SpotLDAInput],
+ precisionUtility: FloatPointPrecisionUtility,
+ sparkSession: SparkSession): SpotLDAHelper = {
+
+ import sparkSession.implicits._
+
+ val docWordCountCache = docWordCount.cache()
+
+ // Forcing an action to cache results.
+ docWordCountCache.count()
+
+ // Create word Map Word,Index for further usage
+ val wordDictionary: Map[String, Int] = {
+ val words = docWordCountCache
+ .map({ case SpotLDAInput(_, word, _) => word })
+ .distinct
+ .collect
+ words.zipWithIndex.toMap
+ }
+
+ val documentDictionary: DataFrame = docWordCountCache
+ .map({ case SpotLDAInput(doc, _, _) => doc })
+ .distinct
+ .zipWithIndex
+ .toDF(DocumentName, DocumentNumber)
+ .cache
+
+ new SpotLDAHelper(sparkSession, docWordCount, documentDictionary, wordDictionary, precisionUtility)
+ }
+
+}
+
+/**
+ * Spot LDA input case class
+ *
+ * @param doc Document name.
+ * @param word Word.
+ * @param count Times the word appears for the document.
+ */
+case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
new file mode 100644
index 0000000..669bb69
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDAModel, LocalLDAModel}
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Spot LDAModel.
+ */
+sealed trait SpotLDAModel {
+
+ /**
+ * Save the model to HDFS
+ *
+ * @param sparkSession
+ * @param location
+ */
+ def save(sparkSession: SparkSession, location: String): Unit
+
+ /**
+ * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring
+ *
+ * @param helper
+ * @return
+ */
+ def predict(helper: SpotLDAHelper): SpotLDAResult
+}
+
+/**
+ * Spark LocalLDAModel wrapper.
+ *
+ * @param ldaModel Spark LDA Model
+ */
+class SpotLocalLDAModel(final val ldaModel: LDAModel) extends SpotLDAModel {
+
+ /**
+ * Save LocalLDAModel on HDFS location
+ *
+ * @param sparkSession the Spark session
+ * @param location the HDFS location
+ */
+ override def save(sparkSession: SparkSession, location: String): Unit = {
+ val sparkContext = sparkSession.sparkContext
+
+ ldaModel.save(sparkContext, location)
+ }
+
+ /**
+ * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring.
+ * SpotLocalLDAModel.predict will use corpus from spotLDAHelper which can be a new set of documents or the same
+ * documents used for training.
+ *
+ * @param spotLDAHelper Spot LDA Helper object, can be the same used for training or a new instance with new
+ * documents.
+ * @return SpotLDAResult
+ */
+ override def predict(spotLDAHelper: SpotLDAHelper): SpotLDAResult = {
+
+ val localLDAModel: LocalLDAModel = ldaModel.asInstanceOf[LocalLDAModel]
+
+ val topicDistributions = localLDAModel.topicDistributions(spotLDAHelper.formattedCorpus)
+ val topicMix = localLDAModel.topicsMatrix
+
+ SpotLDAResult(spotLDAHelper, topicDistributions, topicMix)
+ }
+}
+
+/** Spark DistributedLDAModel wrapper.
+ * Ideally, this model should be used only for batch processing.
+ *
+ * @param ldaModel Spark LDA Model
+ */
+class SpotDistributedLDAModel(final val ldaModel: LDAModel) extends
+ SpotLDAModel {
+
+ /**
+ * Save DistributedLDAModel on HDFS location
+ *
+ * @param sparkSession the Spark session
+ * @param location the HDFS location
+ */
+ override def save(sparkSession: SparkSession, location: String): Unit = {
+ val sparkContext = sparkSession.sparkContext
+
+ ldaModel.save(sparkContext, location)
+ }
+
+ /**
+ * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring.
+ * SpotDistributeLDAModel.predict will use same documents that were used for training, can't predict on new
+ * documents. When passing spotLDAHelper we recommend to make sure it's the same object it was passed for training.
+ *
+ * @param spotLDAHelper Spot LDA Helper object used for training
+ * @return SpotLDAResult
+ */
+ override def predict(spotLDAHelper: SpotLDAHelper): SpotLDAResult = {
+
+ val distributedLDAModel: DistributedLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
+
+ val topicDistributions = distributedLDAModel.topicDistributions
+ val topicsMatrix = distributedLDAModel.topicsMatrix
+
+ SpotLDAResult(spotLDAHelper, topicDistributions, topicsMatrix)
+ }
+}
+
+object SpotLDAModel {
+
+ /**
+ * Factory method, based on instance of ldaModel will generate an object based on DistributedLDAModel
+ * implementation or LocalLDAModel.
+ *
+ * @param ldaModel
+ * @param spotLDAHelper
+ * @return
+ */
+ def apply(ldaModel: LDAModel, spotLDAHelper: SpotLDAHelper = null): SpotLDAModel = {
+
+ ldaModel match {
+ case model: DistributedLDAModel => new SpotDistributedLDAModel(model)
+ case model: LocalLDAModel => new SpotLocalLDAModel(model)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
new file mode 100644
index 0000000..a91cee2
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+/**
+ * LDA results formatted for Apache Spot scoring.
+ *
+ */
+class SpotLDAResult(private final val helper: SpotLDAHelper,
+ final val topicDistributions: RDD[(Long, Vector)],
+ final val documentToTopicMix: DataFrame,
+ final val topicsMix: Matrix,
+ final val wordToTopicMix: Map[String, Array[Double]])
+
+object SpotLDAResult {
+
+ def apply(helper: SpotLDAHelper, topicDistributions: RDD[(Long, Vector)], topicsMix: Matrix): SpotLDAResult = {
+
+ val documentToTopicMix: DataFrame = helper.formatDocumentDistribution(topicDistributions)
+ val wordToTopicMix: Map[String, Array[Double]] = helper.formatTopicDistributions(topicsMix)
+
+ new SpotLDAResult(helper, topicDistributions, documentToTopicMix, topicsMix, wordToTopicMix)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
index 122e8ed..7a8b67e 100644
--- a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
@@ -18,19 +18,15 @@
package org.apache.spot.lda
import org.apache.log4j.Logger
-import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.clustering.{LDAModel, _}
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spot.lda.SpotLDAWrapperSchema._
-import org.apache.spot.utilities.FloatPointPrecisionUtility
-
-import scala.collection.immutable.Map
+import org.apache.spark.sql.SparkSession
/**
* Spark LDA implementation
- * Contains routines for LDA using Scala Spark implementation from mllib
+ * Contains routines for LDA using Scala Spark implementation from org.apache.spark.mllib.clustering
* 1. Creates list of unique documents, words and model based on those two
* 2. Processes the model using Spark LDA
* 3. Reads Spark LDA results: Topic distributions per document (docTopicDist) and word distributions per topic (wordTopicMat)
@@ -42,8 +38,6 @@ object SpotLDAWrapper {
/**
* Runs Spark LDA and returns a new model.
*
- * @param sparkSession the SparkSession
- * @param docWordCount RDD with document list and the word count for each document (corpus)
* @param topicCount number of topics to find
* @param logger application logger
* @param ldaSeed LDA seed
@@ -51,51 +45,20 @@ object SpotLDAWrapper {
* @param ldaBeta topic concentration
* @param ldaOptimizerOption LDA optimizer, em or online
* @param maxIterations maximum number of iterations for the optimizer
- * @param precisionUtility FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
* @return
*/
- def runLDA(sparkSession: SparkSession,
- docWordCount: RDD[SpotLDAInput],
- topicCount: Int,
- logger: Logger,
- ldaSeed: Option[Long],
- ldaAlpha: Double,
- ldaBeta: Double,
- ldaOptimizerOption: String,
- maxIterations: Int,
- precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
-
- import sparkSession.implicits._
+ def run(topicCount: Int,
+ logger: Logger,
+ ldaSeed: Option[Long],
+ ldaAlpha: Double,
+ ldaBeta: Double,
+ ldaOptimizerOption: String,
+ maxIterations: Int,
+ helper: SpotLDAHelper): SpotLDAModel = {
- val docWordCountCache = docWordCount.cache()
-
- // Forcing an action to cache results.
- docWordCountCache.count()
-
- // Create word Map Word,Index for further usage
- val wordDictionary: Map[String, Int] = {
- val words = docWordCountCache
- .map({ case SpotLDAInput(doc, word, count) => word })
- .distinct
- .collect
- words.zipWithIndex.toMap
- }
-
- val documentDictionary: DataFrame = docWordCountCache
- .map({ case SpotLDAInput(doc, word, count) => doc })
- .distinct
- .zipWithIndex
- .toDF(DocumentName, DocumentNumber)
- .cache
// Structure corpus so that the index is the docID, values are the vectors of word occurrences in that doc
- val ldaCorpus: RDD[(Long, Vector)] =
- formatSparkLDAInput(docWordCountCache,
- documentDictionary,
- wordDictionary,
- sparkSession)
-
- docWordCountCache.unpersist()
+ val ldaCorpus: RDD[(Long, Vector)] = helper.formattedCorpus
// Instantiate optimizer based on input
val ldaOptimizer = ldaOptimizerOption match {
@@ -121,162 +84,35 @@ object SpotLDAWrapper {
.setBeta(ldaBeta)
.setOptimizer(ldaOptimizer)
- // If caller does not provide seed to lda, ie. ldaSeed is empty, lda is seeded automatically set to hash value of class name
-
+ // If caller does not provide seed to lda, ie. ldaSeed is empty,
+ // lda is seeded automatically set to hash value of class name
if (ldaSeed.nonEmpty) {
lda.setSeed(ldaSeed.get)
}
- val (wordTopicMat, docTopicDist) = ldaOptimizer match {
- case _: EMLDAOptimizer => {
- val ldaModel = lda.run(ldaCorpus).asInstanceOf[DistributedLDAModel]
-
- // Get word topic mix, from Spark documentation:
- // Inferred topics, where each topic is represented by a distribution over terms.
- // This is a matrix of size vocabSize x k, where each column is a topic.
- // No guarantees are given about the ordering of the topics.
- val wordTopicMat: Matrix = ldaModel.topicsMatrix
-
- // Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
- // i is the fraction of the document which belongs to topic i
- val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions
-
- (wordTopicMat, docTopicDist)
-
- }
-
- case _: OnlineLDAOptimizer => {
- val ldaModel = lda.run(ldaCorpus).asInstanceOf[LocalLDAModel]
-
- // Get word topic mix, from Spark documentation:
- // Inferred topics, where each topic is represented by a distribution over terms.
- // This is a matrix of size vocabSize x k, where each column is a topic.
- // No guarantees are given about the ordering of the topics.
- val wordTopicMat: Matrix = ldaModel.topicsMatrix
-
- // Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
- // i is the fraction of the document which belongs to topic i
- val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions(ldaCorpus)
-
- (wordTopicMat, docTopicDist)
-
- }
-
- }
-
- // Create doc results from vector: convert docID back to string, convert vector of probabilities to array
- val docToTopicMixDF =
- formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession, precisionUtility)
+ val model: LDAModel = lda.run(ldaCorpus)
- documentDictionary.unpersist()
-
- // Create word results from matrix: convert matrix to sequence, wordIDs back to strings, sequence of
- // probabilities to array
- val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
-
- val wordResults = formatSparkLDAWordOutput(wordTopicMat, revWordMap)
-
- // Create output object
- SpotLDAOutput(docToTopicMixDF, wordResults)
+ SpotLDAModel(model)
}
/**
- * Formats input data for LDA algorithm
+ * Load an existing model from HDFS location.
*
- * @param docWordCount RDD with document list and the word count for each document (corpus)
- * @param documentDictionary DataFrame with a distinct list of documents and its id
- * @param wordDictionary immutable Map with distinct list of word and its id
- * @param sparkSession the SparkSession
- * @return
+ * @param sparkSession the Spark session.
+ * @param location the HDFS location for the model.
+ * @param ldaOptimizerOption LDA optimizer, em or online.
+ * @return SpotLDAModel
*/
- def formatSparkLDAInput(docWordCount: RDD[SpotLDAInput],
- documentDictionary: DataFrame,
- wordDictionary: Map[String, Int],
- sparkSession: SparkSession): RDD[(Long, Vector)] = {
-
- import sparkSession.implicits._
+ def load(sparkSession: SparkSession, location: String, ldaOptimizerOption: String): SpotLDAModel = {
+ val sparkContext: SparkContext = sparkSession.sparkContext
- val getWordId = {
- udf((word: String) => (wordDictionary(word)))
+ val model = ldaOptimizerOption match {
+ case "em" => DistributedLDAModel.load(sparkContext, location)
+ case "online" => LocalLDAModel.load(sparkContext, location)
+ case _ => throw new IllegalArgumentException(
+ s"Invalid LDA optimizer $ldaOptimizerOption")
}
- val docWordCountDF = docWordCount
- .map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
- .toDF(DocumentName, WordName, WordNameWordCount)
-
- // Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
- // is indexed by DocID
- val wordCountsPerDocDF = docWordCountDF
- .join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
- .drop(documentDictionary(DocumentName))
- .withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
- .drop(WordName)
-
- val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
- = wordCountsPerDocDF
- .select(DocumentNumber, WordNumber, WordNameWordCount)
- .rdd
- .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
- .groupByKey
-
- // Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
- val numUniqueWords = wordDictionary.size
- val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
- .mapValues({ case vs => Vectors.sparse(numUniqueWords, vs.toSeq) })
-
- ldaInput
- }
-
- /**
- * Format LDA output topicMatrix for spot-ml scoring
- *
- * @param wordTopMat LDA model topicMatrix
- * @param wordMap immutable Map with distinct list of word and its id
- * @return
- */
- def formatSparkLDAWordOutput(wordTopMat: Matrix, wordMap: Map[Int, String]): scala.Predef.Map[String, Array[Double]] = {
-
- // incoming word top matrix is in column-major order and the columns are unnormalized
- val m = wordTopMat.numRows
- val n = wordTopMat.numCols
- val columnSums: Array[Double] = Range(0, n).map(j => (Range(0, m).map(i => wordTopMat(i, j)).sum)).toArray
-
- val wordProbs: Seq[Array[Double]] = wordTopMat.transpose.toArray.grouped(n).toSeq
- .map(unnormProbs => unnormProbs.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
-
- wordProbs.zipWithIndex.map({ case (topicProbs, wordInd) => (wordMap(wordInd), topicProbs) }).toMap
+ SpotLDAModel(model)
}
-
- /**
- * Format LDA output topicDistribution for spot-ml scoring
- *
- * @param docTopDist LDA model topicDistribution
- * @param documentDictionary DataFrame with a distinct list of documents and its id
- * @param sparkSession the SparkSession
- * @param precisionUtility FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
- * @return
- */
- def formatSparkLDADocTopicOutput(docTopDist: RDD[(Long, Vector)], documentDictionary: DataFrame, sparkSession: SparkSession,
- precisionUtility: FloatPointPrecisionUtility):
- DataFrame = {
- import sparkSession.implicits._
-
- val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
- val documentToTopicDistributionDF = docTopDist.toDF(DocumentNumber, TopicProbabilityMix)
-
- val documentToTopicDistributionArray = documentToTopicDistributionDF
- .join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
- .drop(documentDictionary(DocumentNumber))
- .drop(documentToTopicDistributionDF(DocumentNumber))
- .select(DocumentName, TopicProbabilityMix)
- .withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
- .selectExpr(s"$DocumentName AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
-
- precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
- }
-
- case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
-
- case class SpotLDAOutput(docToTopicMix: DataFrame, wordResults: Map[String, Array[Double]])
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index 6be11e1..4e09616 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -20,11 +20,10 @@ package org.apache.spot.netflow.model
import org.apache.log4j.Logger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
import org.apache.spot.netflow.FlowSchema._
import org.apache.spot.netflow.FlowWordCreator
import org.apache.spot.utilities.FloatPointPrecisionUtility
@@ -36,7 +35,7 @@ import org.apache.spot.utilities.data.validation.InvalidDataHandler
* The model uses a topic-modelling approach that:
* 1. Simplifies netflow records into words, one word at the source IP and another (possibly different) at the
* destination IP.
- * 2. The netflow words about each IP are treated as collections of thes words.
+ * 2. The netflow words about each IP are treated as collections of these words.
* 3. A topic modelling approach is used to infer a collection of "topics" that represent common profiles
* of network traffic. These "topics" are probability distributions on words.
* 4. Each IP has a mix of topics corresponding to its behavior.
@@ -112,7 +111,7 @@ class FlowSuspiciousConnectsModel(topicCount: Int,
}
/**
- * Contains dataframe schema information as well as the train-from-dataframe routine
+ * Contains DataFrame schema information as well as the train-from-dataframe routine
* (which is a kind of factory routine) for [[FlowSuspiciousConnectsModel]] instances.
*
*/
@@ -127,7 +126,7 @@ object FlowSuspiciousConnectsModel {
IbytField,
IpktField))
- val ModelColumns = ModelSchema.fieldNames.toList.map(col)
+ val ModelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
def trainModel(sparkSession: SparkSession,
@@ -146,13 +145,12 @@ object FlowSuspiciousConnectsModel {
config.duplicationFactor))
+ import sparkSession.implicits._
// simplify netflow log entries into "words"
val dataWithWords = totalRecords.withColumn(SourceWord, FlowWordCreator.srcWordUDF(ModelColumns: _*))
.withColumn(DestinationWord, FlowWordCreator.dstWordUDF(ModelColumns: _*))
- import sparkSession.implicits._
-
// Aggregate per-word counts at each IP
val srcWordCounts = dataWithWords
.filter(dataWithWords(SourceWord).notEqual(InvalidDataHandler.WordError))
@@ -173,20 +171,19 @@ object FlowSuspiciousConnectsModel {
.reduceByKey(_ + _)
.map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipWordCounts, config.precisionUtility, sparkSession)
- val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkSession,
- ipWordCounts,
- config.topicCount,
+ val model = SpotLDAWrapper.run(config.topicCount,
logger,
config.ldaPRGSeed,
config.ldaAlpha,
config.ldaBeta,
config.ldaOptimizer,
config.ldaMaxiterations,
- config.precisionUtility)
+ spotLDAHelper)
+
+ val results: SpotLDAResult = model.predict(spotLDAHelper)
- new FlowSuspiciousConnectsModel(config.topicCount,
- ipToTopicMix,
- wordToPerTopicProb)
+ new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index 3ef60af..7332fe4 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -25,9 +25,8 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.SuspiciousConnectsScoreFunction
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
import org.apache.spot.proxy.ProxySchema._
import org.apache.spot.utilities._
import org.apache.spot.utilities.data.validation.InvalidDataHandler
@@ -92,7 +91,7 @@ class ProxySuspiciousConnectsModel(topicCount: Int,
*/
object ProxySuspiciousConnectsModel {
- // These buckets are optimized to datasets used for training. Last bucket is of infinite size to ensure fit.
+ // These buckets are optimized to data sets used for training. Last bucket is of infinite size to ensure fit.
// The maximum value of entropy is given by log k where k is the number of distinct categories.
// Given that the alphabet and number of characters is finite the maximum value for entropy is upper bounded.
// Bucket number and size can be changed to provide less/more granularity
@@ -119,8 +118,8 @@ object ProxySuspiciousConnectsModel {
* for clustering in the topic model.
*
* @param sparkSession Spark Session
- * @param logger Logge object.
- * @param config SuspiciousConnetsArgumnetParser.Config object containg CLI arguments.
+ * @param logger Logger object.
+ * @param config SuspiciousConnectsArgumentParser.Config object containing CLI arguments.
* @param inputRecords Dataframe for training data, with columns Host, Time, ReqMethod, FullURI, ResponseContentType,
* UserAgent, RespCode (as defined in ProxySchema object).
* @return ProxySuspiciousConnectsModel
@@ -130,7 +129,7 @@ object ProxySuspiciousConnectsModel {
config: SuspiciousConnectsConfig,
inputRecords: DataFrame): ProxySuspiciousConnectsModel = {
- logger.info("training new proxy suspcious connects model")
+ logger.info("training new proxy suspicious connects model")
val selectedRecords =
@@ -145,24 +144,24 @@ object ProxySuspiciousConnectsModel {
.reduceByKey(_ + _).collect()
.toMap
- val agentToCountBC = sparkSession.sparkContext.broadcast(agentToCount)
-
val docWordCount: RDD[SpotLDAInput] =
getIPWordCounts(sparkSession, logger, selectedRecords, config.feedbackFile, config.duplicationFactor,
agentToCount)
- val SpotLDAOutput(ipToTopicMixDF, wordResults) = SpotLDAWrapper.runLDA(sparkSession,
- docWordCount,
- config.topicCount,
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(docWordCount, config.precisionUtility, sparkSession)
+
+ val model = SpotLDAWrapper.run(config.topicCount,
logger,
config.ldaPRGSeed,
config.ldaAlpha,
config.ldaBeta,
config.ldaOptimizer,
config.ldaMaxiterations,
- config.precisionUtility)
+ spotLDAHelper)
+
+ val results: SpotLDAResult = model.predict(spotLDAHelper)
- new ProxySuspiciousConnectsModel(config.topicCount, ipToTopicMixDF, wordResults)
+ new ProxySuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
index 0183027..083cfe7 100644
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
@@ -28,7 +28,6 @@ object TopDomains {
val TopDomains: Set[String] = Source.fromFile(alexaTop1MPath).getLines.map(line => {
val parts = line.split(",")
- val l = parts.length
parts(1).split('.')(0)
}).toSet
}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
new file mode 100644
index 0000000..93828b2
--- /dev/null
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spot.lda.SpotLDAWrapperSchema.TopicProbabilityMix
+import org.apache.spot.testutils.TestingSparkContextFlatSpec
+import org.apache.spot.utilities.{FloatPointPrecisionUtility32, FloatPointPrecisionUtility64}
+import org.scalatest.Matchers
+
+/**
+ * Created by rabarona on 7/17/17.
+ */
+class SpotLDAHelperTest extends TestingSparkContextFlatSpec with Matchers {
+
+ "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
+ "is the docID, values are the vectors of word occurrences in that doc" in {
+
+
+ val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+ SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+ SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+ SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+ val sparkLDAInput: RDD[(Long, Vector)] = spotLDAHelper.formattedCorpus
+ val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
+
+ sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(5.0, 8.0))), (2, Vectors.sparse(4, Array
+ (1), Array(2.0))), (1, Vectors.sparse(4, Array(2), Array(4.0))))
+ }
+
+ "formatSparkLDADocTopicOutput" should "return RDD[(String,Array(Double))] after converting doc results from vector " +
+ "using PrecisionUtilityDouble: convert docID back to string, convert vector of probabilities to array" in {
+
+ val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+ SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+ SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+ SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+ val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
+ Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
+ (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
+ (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+ val sparkDocRes: DataFrame = spotLDAHelper.formatDocumentDistribution(docTopicDist)
+
+ import testImplicits._
+ val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Double]) => (documentName,
+ docProbabilities)
+ }).collect
+
+ val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Double]]
+
+ documents should contain("192.168.1.1", Seq(0.15, 0.3, 0.5, 0.05))
+ documents should contain("10.10.98.123", Seq(0.25, 0.15, 0.4, 0.2))
+ documents should contain("66.23.45.11", Seq(0.4, 0.1, 0.3, 0.2))
+
+ documentProbabilities(0) shouldBe a[java.lang.Double]
+
+ }
+
+ it should "return RDD[(String,Array(Float))] after converting doc results from vector " +
+ "using PrecisionUtilityFloat: convert docID back to string, convert vector of probabilities to array" in {
+
+ val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+ SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+ SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+ SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility32, sparkSession)
+
+ val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
+ Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
+ (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
+ (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+ val sparkDocRes: DataFrame = spotLDAHelper.formatDocumentDistribution(docTopicDist)
+
+ import testImplicits._
+ val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Float]) => (documentName,
+ docProbabilities)
+ }).collect
+
+ val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Float]]
+
+ documents should contain("192.168.1.1", Seq(0.15f, 0.3f, 0.5f, 0.05f))
+ documents should contain("10.10.98.123", Seq(0.25f, 0.15f, 0.4f, 0.2f))
+ documents should contain("66.23.45.11", Seq(0.4f, 0.1f, 0.3f, 0.2f))
+
+ documentProbabilities(0) shouldBe a[java.lang.Float]
+ }
+
+ "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back " +
+ "to strings, and sequence of probabilities to array" in {
+
+ val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
+
+ val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "23.0_7.0_7.0_4.0", 8),
+ SpotLDAInput("10.10.98.123", "80.0_7.0_7.0_4.0", 4),
+ SpotLDAInput("66.23.45.11", "333333.0_7.0_7.0_4.0", 2),
+ SpotLDAInput("192.168.1.2", "-1_23.0_7.0_7.0_4.0", 5)))
+
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+ val sparkWordRes = spotLDAHelper.formatTopicDistributions(testMat)
+
+ sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
+ sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
+ sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
+ sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
index 5c40068..7007ba1 100644
--- a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
@@ -18,25 +18,18 @@
package org.apache.spot.lda
import org.apache.log4j.{Level, LogManager}
-import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spot.lda.SpotLDAWrapper._
import org.apache.spot.lda.SpotLDAWrapperSchema._
import org.apache.spot.testutils.TestingSparkContextFlatSpec
import org.apache.spot.utilities.{FloatPointPrecisionUtility32, FloatPointPrecisionUtility64}
import org.scalatest.Matchers
-import scala.collection.immutable.Map
-
class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
"SparkLDA" should "handle an extremely unbalanced two word doc with EM optimizer" in {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
- val ldaAlpha = 1.02
+ val ldaAlpha = 1.02
val ldaBeta = 1.001
val ldaMaxIterations = 20
@@ -46,16 +39,20 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val dogWorld = SpotLDAInput("pets", "dog", 999)
val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
- val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
- val topicMixDF = out.docToTopicMix
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+ val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer, ldaMaxIterations, spotLDAHelper)
+
+ val results = model.predict(spotLDAHelper)
+
+ val topicMixDF = results.documentToTopicMix
val topicMix =
- topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+ topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
.asInstanceOf[Seq[Double]].toArray
- val catTopics = out.wordResults("cat")
- val dogTopics = out.wordResults("dog")
+ val catTopics = results.wordToTopicMix("cat")
+ val dogTopics = results.wordToTopicMix("dog")
Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
@@ -65,9 +62,9 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
- val ldaAlpha = 1.2
- val ldaBeta = 1.001
- val ldaMaxIterations = 20
+ val ldaAlpha = 1.002
+ val ldaBeta = 1.0001
+ val ldaMaxIterations = 100
val optimizer = "em"
@@ -75,20 +72,24 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val dogWorld = SpotLDAInput("dog world", "dog", 1)
val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
- val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
- val topicMixDF = out.docToTopicMix
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+ val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer, ldaMaxIterations, spotLDAHelper)
+
+ val results = model.predict(spotLDAHelper)
+
+ val topicMixDF = results.documentToTopicMix
val dogTopicMix: Array[Double] =
topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
- .toSeq(0).asInstanceOf[Seq[Double]].toArray
+ .toSeq.head.asInstanceOf[Seq[Double]].toArray
val catTopicMix: Array[Double] =
topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
- .toSeq(0).asInstanceOf[Seq[Double]].toArray
+ .toSeq.head.asInstanceOf[Seq[Double]].toArray
- val catTopics = out.wordResults("cat")
- val dogTopics = out.wordResults("dog")
+ val catTopics = results.wordToTopicMix("cat")
+ val dogTopics = results.wordToTopicMix("dog")
Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
@@ -98,7 +99,7 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
- val ldaAlpha = 0.0009
+ val ldaAlpha = 0.0009
val ldaBeta = 0.00001
val ldaMaxIterations = 400
@@ -108,16 +109,20 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val dogWorld = SpotLDAInput("pets", "dog", 999)
val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
- val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
- val topicMixDF = out.docToTopicMix
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+ val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer, ldaMaxIterations, spotLDAHelper)
+
+ val results = model.predict(spotLDAHelper)
+
+ val topicMixDF = results.documentToTopicMix
val topicMix =
- topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+ topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
.asInstanceOf[Seq[Double]].toArray
- val catTopics = out.wordResults("cat")
- val dogTopics = out.wordResults("dog")
+ val catTopics = results.wordToTopicMix("cat")
+ val dogTopics = results.wordToTopicMix("dog")
Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
@@ -127,7 +132,7 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
- val ldaAlpha = 0.0009
+ val ldaAlpha = 0.0009
val ldaBeta = 0.00001
val ldaMaxIterations = 400
val optimizer = "online"
@@ -136,20 +141,25 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val dogWorld = SpotLDAInput("dog world", "dog", 1)
val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
- val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
- val topicMixDF = out.docToTopicMix
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+ val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer, ldaMaxIterations, spotLDAHelper)
+
+ val results = model.predict(spotLDAHelper)
+
+ val topicMixDF = results.documentToTopicMix
+
val dogTopicMix: Array[Double] =
topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
- .toSeq(0).asInstanceOf[Seq[Double]].toArray
+ .toSeq.head.asInstanceOf[Seq[Double]].toArray
val catTopicMix: Array[Double] =
topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
- .toSeq(0).asInstanceOf[Seq[Double]].toArray
+ .toSeq.head.asInstanceOf[Seq[Double]].toArray
- val catTopics = out.wordResults("cat")
- val dogTopics = out.wordResults("dog")
+ val catTopics = results.wordToTopicMix("cat")
+ val dogTopics = results.wordToTopicMix("dog")
Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
@@ -159,7 +169,7 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
- val ldaAlpha = 1.02
+ val ldaAlpha = 1.02
val ldaBeta = 1.001
val ldaMaxIterations = 20
@@ -169,16 +179,20 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val dogWorld = SpotLDAInput("pets", "dog", 999)
val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
- val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
- val topicMixDF = out.docToTopicMix
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility32, sparkSession)
+ val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer, ldaMaxIterations, spotLDAHelper)
+
+ val results = model.predict(spotLDAHelper)
+
+ val topicMixDF = results.documentToTopicMix
val topicMix =
- topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+ topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
.asInstanceOf[Seq[Float]].toArray
- val catTopics = out.wordResults("cat")
- val dogTopics = out.wordResults("dog")
+ val catTopics = results.wordToTopicMix("cat")
+ val dogTopics = results.wordToTopicMix("dog")
Math.abs(topicMix(0).toDouble * catTopics(0) + topicMix(1).toDouble * catTopics(1)) should be < 0.01
Math.abs(0.999 - (topicMix(0).toDouble * dogTopics(0) + topicMix(1).toDouble * dogTopics(1))) should be < 0.01
@@ -188,7 +202,7 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
logger.setLevel(Level.WARN)
- val ldaAlpha = 1.02
+ val ldaAlpha = 1.02
val ldaBeta = 1.001
val ldaMaxIterations = 20
@@ -198,134 +212,28 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
val dogWorld = SpotLDAInput("dog world", "dog", 1)
val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
- val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
- optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
- val topicMixDF = out.docToTopicMix
+ val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility32, sparkSession)
+ val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+ optimizer, ldaMaxIterations, spotLDAHelper)
+
+ val results = model.predict(spotLDAHelper)
+
+ val topicMixDF = results.documentToTopicMix
+
val dogTopicMix: Array[Float] =
- topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq(0)
+ topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq.head
.asInstanceOf[Seq[Float]].toArray
val catTopicMix: Array[Float] =
- topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq(0)
+ topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq.head
.asInstanceOf[Seq[Float]].toArray
- val catTopics = out.wordResults("cat")
- val dogTopics = out.wordResults("dog")
+ val catTopics = results.wordToTopicMix("cat")
+ val dogTopics = results.wordToTopicMix("dog")
Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
}
- "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
- "is the docID, values are the vectors of word occurrences in that doc" in {
-
-
- val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
- SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
- SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
- SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
- val wordDictionary = Map("333333_7.0_0.0_1.0" -> 0, "1111111_6.0_3.0_5.0" -> 1, "-1_43_7.0_2.0_6.0" -> 2,
- "-1_80_6.0_1.0_1.0" -> 3)
-
- val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
- .map({ case SpotLDAInput(doc, word, count) => doc })
- .distinct
- .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-
- val sparkLDAInput: RDD[(Long, Vector)] = SpotLDAWrapper.formatSparkLDAInput(documentWordData,
- documentDictionary, wordDictionary, sparkSession)
- val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
-
- sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(8.0, 5.0))), (2, Vectors.sparse(4, Array
- (2), Array(2.0))), (1, Vectors.sparse(4, Array(1), Array(4.0))))
- }
-
- "formatSparkLDADocTopicOutput" should "return RDD[(String,Array(Double))] after converting doc results from vector " +
- "using PrecisionUtilityDouble: convert docID back to string, convert vector of probabilities to array" in {
-
- val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
- SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
- SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
- SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
- val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
- .map({ case SpotLDAInput(doc, word, count) => doc })
- .distinct
- .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
- val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
- Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
- (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
- (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
-
- val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession,
- FloatPointPrecisionUtility64)
-
- import testImplicits._
- val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Double]) => (documentName,
- docProbabilities)
- }).collect
-
- val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Double]]
-
- documents should contain("192.168.1.1", Seq(0.15, 0.3, 0.5, 0.05))
- documents should contain("10.10.98.123", Seq(0.25, 0.15, 0.4, 0.2))
- documents should contain("66.23.45.11", Seq(0.4, 0.1, 0.3, 0.2))
-
- documentProbabilities(0) shouldBe a[java.lang.Double]
-
- }
-
- it should "return RDD[(String,Array(Float))] after converting doc results from vector " +
- "using PrecisionUtilityFloat: convert docID back to string, convert vector of probabilities to array" in {
-
- val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
- SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
- SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
- SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
- val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
- .map({ case SpotLDAInput(doc, word, count) => doc })
- .distinct
- .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
- val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
- Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
- (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
- (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
-
- val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession,
- FloatPointPrecisionUtility32)
-
- import testImplicits._
- val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Float]) => (documentName,
- docProbabilities)
- }).collect
-
- val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Float]]
-
- documents should contain("192.168.1.1", Seq(0.15f, 0.3f, 0.5f, 0.05f))
- documents should contain("10.10.98.123", Seq(0.25f, 0.15f, 0.4f, 0.2f))
- documents should contain("66.23.45.11", Seq(0.4f, 0.1f, 0.3f, 0.2f))
-
- documentProbabilities(0) shouldBe a[java.lang.Float]
- }
-
- "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back " +
- "to strings, and sequence of probabilities to array" in {
- val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
-
- val wordDictionary = Map("-1_23.0_7.0_7.0_4.0" -> 3, "23.0_7.0_7.0_4.0" -> 0, "333333.0_7.0_7.0_4.0" -> 2, "80.0_7.0_7.0_4.0" -> 1)
- val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
-
- val sparkWordRes = formatSparkLDAWordOutput(testMat, revWordMap)
-
- sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
- sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
- sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
- sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
- }
}
\ No newline at end of file