You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 19:28:15 UTC

[06/42] incubator-spot git commit: Spot-196: Changes: - TopDomains - small refactoring, deleted unused variable. - Decoupled SpotLDAWrapper into different components for better modularity. - Modified Flow, DNS and Proxy SuspiciousConnectsModel objects to

Spot-196: Changes:
- TopDomains - small refactoring, deleted unused variable.
- Decoupled SpotLDAWrapper into different components for better modularity.
- Modified Flow, DNS and Proxy SuspiciousConnectsModel objects to implement changes on SpotLDAWrapper
- Updated unit tests and implemented new ones to cover changes on SpotLDAWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/45c03ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/45c03ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/45c03ab6

Branch: refs/heads/SPOT-181_ODM
Commit: 45c03ab6ab97ffbd2a1228fb5dbd071511ce3c7b
Parents: 2ebe572
Author: Ricardo Barona <ri...@intel.com>
Authored: Thu Aug 3 14:02:13 2017 -0500
Committer: Ricardo Barona <ri...@intel.com>
Committed: Fri Oct 6 15:25:58 2017 -0500

----------------------------------------------------------------------
 .../dns/model/DNSSuspiciousConnectsModel.scala  |  43 ++--
 .../org/apache/spot/lda/SpotLDAHelper.scala     | 172 ++++++++++++++
 .../org/apache/spot/lda/SpotLDAModel.scala      | 140 +++++++++++
 .../org/apache/spot/lda/SpotLDAResult.scala     |  43 ++++
 .../org/apache/spot/lda/SpotLDAWrapper.scala    | 226 +++---------------
 .../model/FlowSuspiciousConnectsModel.scala     |  27 +--
 .../proxy/ProxySuspiciousConnectsModel.scala    |  25 +-
 .../org/apache/spot/utilities/TopDomains.scala  |   1 -
 .../org/apache/spot/lda/SpotLDAHelperTest.scala | 133 +++++++++++
 .../apache/spot/lda/SpotLDAWrapperTest.scala    | 238 ++++++-------------
 10 files changed, 636 insertions(+), 412 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
index dfcb543..7245acf 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/model/DNSSuspiciousConnectsModel.scala
@@ -19,22 +19,18 @@ package org.apache.spot.dns.model
 
 import org.apache.log4j.Logger
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
 import org.apache.spot.dns.DNSWordCreation
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper._
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda._
 import org.apache.spot.utilities.DomainProcessor.DomainInfo
 import org.apache.spot.utilities._
 import org.apache.spot.utilities.data.validation.InvalidDataHandler
 
-import scala.util.{Failure, Success, Try}
-
 
 /**
   * A probabilistic model of the DNS queries issued by each client IP.
@@ -50,17 +46,17 @@ import scala.util.{Failure, Success, Try}
   *
   * Create these models using the  factory in the companion object.
   *
-  * @param inTopicCount          Number of topics to use in the topic model.
-  * @param inIpToTopicMix        Per-IP topic mix.
-  * @param inWordToPerTopicProb  Per-word,  an array of probability of word given topic per topic.
+  * @param inTopicCount         Number of topics to use in the topic model.
+  * @param inIpToTopicMix       Per-IP topic mix.
+  * @param inWordToPerTopicProb Per-word,  an array of probability of word given topic per topic.
   */
 class DNSSuspiciousConnectsModel(inTopicCount: Int,
                                  inIpToTopicMix: DataFrame,
                                  inWordToPerTopicProb: Map[String, Array[Double]]) {
 
-  val topicCount = inTopicCount
-  val ipToTopicMix = inIpToTopicMix
-  val wordToPerTopicProb = inWordToPerTopicProb
+  val topicCount: Int = inTopicCount
+  val ipToTopicMix: DataFrame = inIpToTopicMix
+  val wordToPerTopicProb: Map[String, Array[Double]] = inWordToPerTopicProb
 
   /**
     * Use a suspicious connects model to assign estimated probabilities to a dataframe of
@@ -128,7 +124,7 @@ object DNSSuspiciousConnectsModel {
     QueryTypeField,
     QueryResponseCodeField))
 
-  val modelColumns = ModelSchema.fieldNames.toList.map(col)
+  val modelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
 
   val DomainStatsSchema = StructType(List(TopDomainField, SubdomainLengthField, SubdomainEntropyField, NumPeriodsField))
 
@@ -136,7 +132,7 @@ object DNSSuspiciousConnectsModel {
     * Create a new DNS Suspicious Connects model by training it on a data frame and a feedback file.
     *
     * @param sparkSession Spark Session
-    * @param logger
+    * @param logger       Application logger
     * @param config       Analysis configuration object containing CLI parameters.
     *                     Contains the path to the feedback file in config.scoresFile
     * @param inputRecords Data used to train the model.
@@ -155,7 +151,6 @@ object DNSSuspiciousConnectsModel {
       config.feedbackFile,
       config.duplicationFactor))
 
-    val countryCodesBC = sparkSession.sparkContext.broadcast(CountryCodes.CountryCodes)
     val topDomainsBC = sparkSession.sparkContext.broadcast(TopDomains.TopDomains)
     val userDomain = config.userDomain
 
@@ -175,19 +170,20 @@ object DNSSuspiciousConnectsModel {
         .reduceByKey(_ + _)
         .map({ case ((ipDst, word), count) => SpotLDAInput(ipDst, word, count) })
 
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipDstWordCounts, config.precisionUtility, sparkSession)
 
-    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkSession,
-      ipDstWordCounts,
-      config.topicCount,
+    val model: SpotLDAModel = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
+
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
 
-    new DNSSuspiciousConnectsModel(config.topicCount, ipToTopicMix, wordToPerTopicProb)
+    new DNSSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
 
   }
 
@@ -205,15 +201,16 @@ object DNSSuspiciousConnectsModel {
                        userDomain: String,
                        url: String): TempFields = {
 
-    val DomainInfo(_, topDomainClass, subdomain, subdomainLength, subdomainEntropy, numPeriods) =
+    val DomainInfo(_, topDomainClass, _, subDomainLength, subDomainEntropy, numPeriods) =
       DomainProcessor.extractDomainInfo(url, topDomainsBC, userDomain)
 
 
     TempFields(topDomainClass = topDomainClass,
-      subdomainLength = subdomainLength,
-      subdomainEntropy = subdomainEntropy,
+      subdomainLength = subDomainLength,
+      subdomainEntropy = subDomainEntropy,
       numPeriods = numPeriods)
   }
 
   case class TempFields(topDomainClass: Int, subdomainLength: Integer, subdomainEntropy: Double, numPeriods: Integer)
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
new file mode 100644
index 0000000..8e771cb
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAHelper.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.utilities.{FloatPointPrecisionUtility, FloatPointPrecisionUtility64}
+
+import scala.collection.immutable.Map
+
+/**
+  * Apache Spot routines to format Spark LDA input and output for scoring.
+  */
+class SpotLDAHelper(private final val sparkSession: SparkSession,
+                    final val docWordCount: RDD[SpotLDAInput],
+                    final val documentDictionary: DataFrame,
+                    final val wordDictionary: Map[String, Int],
+                    final val precisionUtility: FloatPointPrecisionUtility = FloatPointPrecisionUtility64) extends Serializable {
+
+  /**
+    * Format document word count as RDD[(Long, Vector)] - input data for LDA algorithm
+    *
+    * @return RDD[(Long, Vector)]
+    */
+  val formattedCorpus: RDD[(Long, Vector)] = {
+    import sparkSession.implicits._
+
+    val getWordId = {
+      udf((word: String) => wordDictionary(word))
+    }
+
+    val docWordCountDF = docWordCount
+      .map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
+      .toDF(DocumentName, WordName, WordNameWordCount)
+
+    // Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
+    // is indexed by DocID
+    val wordCountsPerDocDF = docWordCountDF
+      .join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
+      .drop(documentDictionary(DocumentName))
+      .withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
+      .drop(WordName)
+
+    val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
+    = wordCountsPerDocDF
+      .select(DocumentNumber, WordNumber, WordNameWordCount)
+      .rdd
+      .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
+      .groupByKey
+
+    // Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
+    val numUniqueWords = wordDictionary.size
+    val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
+      .mapValues(vs => Vectors.sparse(numUniqueWords, vs.toSeq))
+
+    ldaInput
+  }
+
+  /**
+    * Format LDA output topicDistribution for spot-ml scoring
+    *
+    * @param documentDistributions LDA model topicDistributions
+    * @return DataFrame
+    */
+  def formatDocumentDistribution(documentDistributions: RDD[(Long, Vector)]): DataFrame = {
+    import sparkSession.implicits._
+
+    val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
+    val documentToTopicDistributionDF = documentDistributions.toDF(DocumentNumber, TopicProbabilityMix)
+
+    val documentToTopicDistributionArray = documentToTopicDistributionDF
+      .join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
+      .drop(documentDictionary(DocumentNumber))
+      .drop(documentToTopicDistributionDF(DocumentNumber))
+      .select(DocumentName, TopicProbabilityMix)
+      .withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
+      .selectExpr(s"$DocumentName  AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
+
+    precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
+  }
+
+  /**
+    * Format LDA output topicMatrix for spot-ml scoring
+    *
+    * @param topicsMatrix LDA model topicMatrix
+    * @return Map[String, Array[Double]]
+    **/
+  def formatTopicDistributions(topicsMatrix: Matrix): Map[String, Array[Double]] = {
+    // Incoming word top matrix is in column-major order and the columns are unnormalized
+    val m = topicsMatrix.numRows
+    val n = topicsMatrix.numCols
+    val reverseWordDictionary = wordDictionary.map(_.swap)
+
+    val columnSums: Array[Double] = Range(0, n).map(j => Range(0, m).map(i => topicsMatrix(i, j)).sum).toArray
+
+    val wordProbabilities: Seq[Array[Double]] = topicsMatrix.transpose.toArray.grouped(n).toSeq
+      .map(unNormalizedProbabilities => unNormalizedProbabilities.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
+
+    wordProbabilities.zipWithIndex
+      .map({ case (topicProbabilities, wordInd) => (reverseWordDictionary(wordInd), topicProbabilities) }).toMap
+  }
+
+}
+
+object SpotLDAHelper {
+
+  /**
+    * Factory method for SpotLDAHelper new instance.
+    *
+    * @param docWordCount Document word count.
+    * @param precisionUtility
+    * @param sparkSession
+    * @return
+    */
+  def apply(docWordCount: RDD[SpotLDAInput],
+            precisionUtility: FloatPointPrecisionUtility,
+            sparkSession: SparkSession): SpotLDAHelper = {
+
+    import sparkSession.implicits._
+
+    val docWordCountCache = docWordCount.cache()
+
+    // Forcing an action to cache results.
+    docWordCountCache.count()
+
+    // Create word Map Word,Index for further usage
+    val wordDictionary: Map[String, Int] = {
+      val words = docWordCountCache
+        .map({ case SpotLDAInput(_, word, _) => word })
+        .distinct
+        .collect
+      words.zipWithIndex.toMap
+    }
+
+    val documentDictionary: DataFrame = docWordCountCache
+      .map({ case SpotLDAInput(doc, _, _) => doc })
+      .distinct
+      .zipWithIndex
+      .toDF(DocumentName, DocumentNumber)
+      .cache
+
+    new SpotLDAHelper(sparkSession, docWordCount, documentDictionary, wordDictionary, precisionUtility)
+  }
+
+}
+
+/**
+  * Spot LDA input case class
+  *
+  * @param doc   Document name.
+  * @param word  Word.
+  * @param count Times the word appears for the document.
+  */
+case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
new file mode 100644
index 0000000..669bb69
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAModel.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDAModel, LocalLDAModel}
+import org.apache.spark.sql.SparkSession
+
+/**
+  * Spot LDAModel.
+  */
+sealed trait SpotLDAModel {
+
+  /**
+    * Save the model to HDFS
+    *
+    * @param sparkSession
+    * @param location
+    */
+  def save(sparkSession: SparkSession, location: String): Unit
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring
+    *
+    * @param helper
+    * @return
+    */
+  def predict(helper: SpotLDAHelper): SpotLDAResult
+}
+
+/**
+  * Spark LocalLDAModel wrapper.
+  *
+  * @param ldaModel Spark LDA Model
+  */
+class SpotLocalLDAModel(final val ldaModel: LDAModel) extends SpotLDAModel {
+
+  /**
+    * Save LocalLDAModel on HDFS location
+    *
+    * @param sparkSession the Spark session
+    * @param location     the HDFS location
+    */
+  override def save(sparkSession: SparkSession, location: String): Unit = {
+    val sparkContext = sparkSession.sparkContext
+
+    ldaModel.save(sparkContext, location)
+  }
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring.
+    * SpotLocalLDAModel.predict will use corpus from spotLDAHelper which can be a new set of documents or the same
+    * documents used for training.
+    *
+    * @param spotLDAHelper Spot LDA Helper object, can be the same used for training or a new instance with new
+    *                      documents.
+    * @return SpotLDAResult
+    */
+  override def predict(spotLDAHelper: SpotLDAHelper): SpotLDAResult = {
+
+    val localLDAModel: LocalLDAModel = ldaModel.asInstanceOf[LocalLDAModel]
+
+    val topicDistributions = localLDAModel.topicDistributions(spotLDAHelper.formattedCorpus)
+    val topicMix = localLDAModel.topicsMatrix
+
+    SpotLDAResult(spotLDAHelper, topicDistributions, topicMix)
+  }
+}
+
+/** Spark DistributedLDAModel wrapper.
+  * Ideally, this model should be used only for batch processing.
+  *
+  * @param ldaModel Spark LDA Model
+  */
+class SpotDistributedLDAModel(final val ldaModel: LDAModel) extends
+  SpotLDAModel {
+
+  /**
+    * Save DistributedLDAModel on HDFS location
+    *
+    * @param sparkSession the Spark session
+    * @param location     the HDFS location
+    */
+  override def save(sparkSession: SparkSession, location: String): Unit = {
+    val sparkContext = sparkSession.sparkContext
+
+    ldaModel.save(sparkContext, location)
+  }
+
+  /**
+    * Predict topicDistributions and get topicsMatrix along with results formatted for Apache Spot scoring.
+    * SpotDistributeLDAModel.predict will use same documents that were used for training, can't predict on new
+    * documents. When passing spotLDAHelper we recommend to make sure it's the same object it was passed for training.
+    *
+    * @param spotLDAHelper Spot LDA Helper object used for training
+    * @return SpotLDAResult
+    */
+  override def predict(spotLDAHelper: SpotLDAHelper): SpotLDAResult = {
+
+    val distributedLDAModel: DistributedLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
+
+    val topicDistributions = distributedLDAModel.topicDistributions
+    val topicsMatrix = distributedLDAModel.topicsMatrix
+
+    SpotLDAResult(spotLDAHelper, topicDistributions, topicsMatrix)
+  }
+}
+
+object SpotLDAModel {
+
+  /**
+    * Factory method, based on instance of ldaModel will generate an object based on DistributedLDAModel
+    * implementation or LocalLDAModel.
+    *
+    * @param ldaModel
+    * @param spotLDAHelper
+    * @return
+    */
+  def apply(ldaModel: LDAModel, spotLDAHelper: SpotLDAHelper = null): SpotLDAModel = {
+
+    ldaModel match {
+      case model: DistributedLDAModel => new SpotDistributedLDAModel(model)
+      case model: LocalLDAModel => new SpotLocalLDAModel(model)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
new file mode 100644
index 0000000..a91cee2
--- /dev/null
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAResult.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+/**
+  * LDA results formatted for Apache Spot scoring.
+  *
+  */
+class SpotLDAResult(private final val helper: SpotLDAHelper,
+                    final val topicDistributions: RDD[(Long, Vector)],
+                    final val documentToTopicMix: DataFrame,
+                    final val topicsMix: Matrix,
+                    final val wordToTopicMix: Map[String, Array[Double]])
+
+object SpotLDAResult {
+
+  def apply(helper: SpotLDAHelper, topicDistributions: RDD[(Long, Vector)], topicsMix: Matrix): SpotLDAResult = {
+
+    val documentToTopicMix: DataFrame = helper.formatDocumentDistribution(topicDistributions)
+    val wordToTopicMix: Map[String, Array[Double]] = helper.formatTopicDistributions(topicsMix)
+
+    new SpotLDAResult(helper, topicDistributions, documentToTopicMix, topicsMix, wordToTopicMix)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
index 122e8ed..7a8b67e 100644
--- a/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/lda/SpotLDAWrapper.scala
@@ -18,19 +18,15 @@
 package org.apache.spot.lda
 
 import org.apache.log4j.Logger
-import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.clustering.{LDAModel, _}
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-import org.apache.spot.lda.SpotLDAWrapperSchema._
-import org.apache.spot.utilities.FloatPointPrecisionUtility
-
-import scala.collection.immutable.Map
+import org.apache.spark.sql.SparkSession
 
 /**
   * Spark LDA implementation
-  * Contains routines for LDA using Scala Spark implementation from mllib
+  * Contains routines for LDA using Scala Spark implementation from org.apache.spark.mllib.clustering
   * 1. Creates list of unique documents, words and model based on those two
   * 2. Processes the model using Spark LDA
   * 3. Reads Spark LDA results: Topic distributions per document (docTopicDist) and word distributions per topic (wordTopicMat)
@@ -42,8 +38,6 @@ object SpotLDAWrapper {
   /**
     * Runs Spark LDA and returns a new model.
     *
-    * @param sparkSession       the SparkSession
-    * @param docWordCount       RDD with document list and the word count for each document (corpus)
     * @param topicCount         number of topics to find
     * @param logger             application logger
     * @param ldaSeed            LDA seed
@@ -51,51 +45,20 @@ object SpotLDAWrapper {
     * @param ldaBeta            topic concentration
     * @param ldaOptimizerOption LDA optimizer, em or online
     * @param maxIterations      maximum number of iterations for the optimizer
-    * @param precisionUtility   FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
     * @return
     */
-  def runLDA(sparkSession: SparkSession,
-             docWordCount: RDD[SpotLDAInput],
-             topicCount: Int,
-             logger: Logger,
-             ldaSeed: Option[Long],
-             ldaAlpha: Double,
-             ldaBeta: Double,
-             ldaOptimizerOption: String,
-             maxIterations: Int,
-             precisionUtility: FloatPointPrecisionUtility): SpotLDAOutput = {
-
-    import sparkSession.implicits._
+  def run(topicCount: Int,
+          logger: Logger,
+          ldaSeed: Option[Long],
+          ldaAlpha: Double,
+          ldaBeta: Double,
+          ldaOptimizerOption: String,
+          maxIterations: Int,
+          helper: SpotLDAHelper): SpotLDAModel = {
 
-    val docWordCountCache = docWordCount.cache()
-
-    // Forcing an action to cache results.
-    docWordCountCache.count()
-
-    // Create word Map Word,Index for further usage
-    val wordDictionary: Map[String, Int] = {
-      val words = docWordCountCache
-        .map({ case SpotLDAInput(doc, word, count) => word })
-        .distinct
-        .collect
-      words.zipWithIndex.toMap
-    }
-
-    val documentDictionary: DataFrame = docWordCountCache
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex
-      .toDF(DocumentName, DocumentNumber)
-      .cache
 
     // Structure corpus so that the index is the docID, values are the vectors of word occurrences in that doc
-    val ldaCorpus: RDD[(Long, Vector)] =
-      formatSparkLDAInput(docWordCountCache,
-        documentDictionary,
-        wordDictionary,
-        sparkSession)
-
-    docWordCountCache.unpersist()
+    val ldaCorpus: RDD[(Long, Vector)] = helper.formattedCorpus
 
     // Instantiate optimizer based on input
     val ldaOptimizer = ldaOptimizerOption match {
@@ -121,162 +84,35 @@ object SpotLDAWrapper {
         .setBeta(ldaBeta)
         .setOptimizer(ldaOptimizer)
 
-    // If caller does not provide seed to lda, ie. ldaSeed is empty, lda is seeded automatically set to hash value of class name
-
+    // If caller does not provide seed to lda, ie. ldaSeed is empty,
+    // lda is seeded automatically set to hash value of class name
     if (ldaSeed.nonEmpty) {
       lda.setSeed(ldaSeed.get)
     }
 
-    val (wordTopicMat, docTopicDist) = ldaOptimizer match {
-      case _: EMLDAOptimizer => {
-        val ldaModel = lda.run(ldaCorpus).asInstanceOf[DistributedLDAModel]
-
-        // Get word topic mix, from Spark documentation:
-        // Inferred topics, where each topic is represented by a distribution over terms.
-        // This is a matrix of size vocabSize x k, where each column is a topic.
-        // No guarantees are given about the ordering of the topics.
-        val wordTopicMat: Matrix = ldaModel.topicsMatrix
-
-        // Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
-        // i is the fraction of the document which belongs to topic i
-        val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions
-
-        (wordTopicMat, docTopicDist)
-
-      }
-
-      case _: OnlineLDAOptimizer => {
-        val ldaModel = lda.run(ldaCorpus).asInstanceOf[LocalLDAModel]
-
-        // Get word topic mix, from Spark documentation:
-        // Inferred topics, where each topic is represented by a distribution over terms.
-        // This is a matrix of size vocabSize x k, where each column is a topic.
-        // No guarantees are given about the ordering of the topics.
-        val wordTopicMat: Matrix = ldaModel.topicsMatrix
-
-        // Topic distribution: for each document, return distribution (vector) over topics for that docs where entry
-        // i is the fraction of the document which belongs to topic i
-        val docTopicDist: RDD[(Long, Vector)] = ldaModel.topicDistributions(ldaCorpus)
-
-        (wordTopicMat, docTopicDist)
-
-      }
-
-    }
-
-    // Create doc results from vector: convert docID back to string, convert vector of probabilities to array
-    val docToTopicMixDF =
-      formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession, precisionUtility)
+    val model: LDAModel = lda.run(ldaCorpus)
 
-    documentDictionary.unpersist()
-
-    // Create word results from matrix: convert matrix to sequence, wordIDs back to strings, sequence of
-    // probabilities to array
-    val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
-
-    val wordResults = formatSparkLDAWordOutput(wordTopicMat, revWordMap)
-
-    // Create output object
-    SpotLDAOutput(docToTopicMixDF, wordResults)
+    SpotLDAModel(model)
   }
 
   /**
-    * Formats input data for LDA algorithm
+    * Load an existing model from HDFS location.
     *
-    * @param docWordCount       RDD with document list and the word count for each document (corpus)
-    * @param documentDictionary DataFrame with a distinct list of documents and its id
-    * @param wordDictionary     immutable Map with distinct list of word and its id
-    * @param sparkSession       the SparkSession
-    * @return
+    * @param sparkSession       the Spark session.
+    * @param location           the HDFS location for the model.
+    * @param ldaOptimizerOption LDA optimizer, em or online.
+    * @return SpotLDAModel
     */
-  def formatSparkLDAInput(docWordCount: RDD[SpotLDAInput],
-                          documentDictionary: DataFrame,
-                          wordDictionary: Map[String, Int],
-                          sparkSession: SparkSession): RDD[(Long, Vector)] = {
-
-    import sparkSession.implicits._
+  def load(sparkSession: SparkSession, location: String, ldaOptimizerOption: String): SpotLDAModel = {
+    val sparkContext: SparkContext = sparkSession.sparkContext
 
-    val getWordId = {
-      udf((word: String) => (wordDictionary(word)))
+    val model = ldaOptimizerOption match {
+      case "em" => DistributedLDAModel.load(sparkContext, location)
+      case "online" => LocalLDAModel.load(sparkContext, location)
+      case _ => throw new IllegalArgumentException(
+        s"Invalid LDA optimizer $ldaOptimizerOption")
     }
 
-    val docWordCountDF = docWordCount
-      .map({ case SpotLDAInput(doc, word, count) => (doc, word, count) })
-      .toDF(DocumentName, WordName, WordNameWordCount)
-
-    // Convert SpotSparkLDAInput into desired format for Spark LDA: (doc, word, count) -> word count per doc, where RDD
-    // is indexed by DocID
-    val wordCountsPerDocDF = docWordCountDF
-      .join(documentDictionary, docWordCountDF(DocumentName) === documentDictionary(DocumentName))
-      .drop(documentDictionary(DocumentName))
-      .withColumn(WordNumber, getWordId(docWordCountDF(WordName)))
-      .drop(WordName)
-
-    val wordCountsPerDoc: RDD[(Long, Iterable[(Int, Double)])]
-    = wordCountsPerDocDF
-      .select(DocumentNumber, WordNumber, WordNameWordCount)
-      .rdd
-      .map({ case Row(documentId: Long, wordId: Int, wordCount: Int) => (documentId.toLong, (wordId, wordCount.toDouble)) })
-      .groupByKey
-
-    // Sum of distinct words in each doc (words will be repeated between different docs), used for sparse vec size
-    val numUniqueWords = wordDictionary.size
-    val ldaInput: RDD[(Long, Vector)] = wordCountsPerDoc
-      .mapValues({ case vs => Vectors.sparse(numUniqueWords, vs.toSeq) })
-
-    ldaInput
-  }
-
-  /**
-    * Format LDA output topicMatrix for spot-ml scoring
-    *
-    * @param wordTopMat LDA model topicMatrix
-    * @param wordMap    immutable Map with distinct list of word and its id
-    * @return
-    */
-  def formatSparkLDAWordOutput(wordTopMat: Matrix, wordMap: Map[Int, String]): scala.Predef.Map[String, Array[Double]] = {
-
-    // incoming word top matrix is in column-major order and the columns are unnormalized
-    val m = wordTopMat.numRows
-    val n = wordTopMat.numCols
-    val columnSums: Array[Double] = Range(0, n).map(j => (Range(0, m).map(i => wordTopMat(i, j)).sum)).toArray
-
-    val wordProbs: Seq[Array[Double]] = wordTopMat.transpose.toArray.grouped(n).toSeq
-      .map(unnormProbs => unnormProbs.zipWithIndex.map({ case (u, j) => u / columnSums(j) }))
-
-    wordProbs.zipWithIndex.map({ case (topicProbs, wordInd) => (wordMap(wordInd), topicProbs) }).toMap
+    SpotLDAModel(model)
   }
-
-  /**
-    * Format LDA output topicDistribution for spot-ml scoring
-    *
-    * @param docTopDist         LDA model topicDistribution
-    * @param documentDictionary DataFrame with a distinct list of documents and its id
-    * @param sparkSession       the SparkSession
-    * @param precisionUtility   FloatPointPrecisionUtility implementation based on user configuration (64 or 32 bit)
-    * @return
-    */
-  def formatSparkLDADocTopicOutput(docTopDist: RDD[(Long, Vector)], documentDictionary: DataFrame, sparkSession: SparkSession,
-                                   precisionUtility: FloatPointPrecisionUtility):
-  DataFrame = {
-    import sparkSession.implicits._
-
-    val topicDistributionToArray = udf((topicDistribution: Vector) => topicDistribution.toArray)
-    val documentToTopicDistributionDF = docTopDist.toDF(DocumentNumber, TopicProbabilityMix)
-
-    val documentToTopicDistributionArray = documentToTopicDistributionDF
-      .join(documentDictionary, documentToTopicDistributionDF(DocumentNumber) === documentDictionary(DocumentNumber))
-      .drop(documentDictionary(DocumentNumber))
-      .drop(documentToTopicDistributionDF(DocumentNumber))
-      .select(DocumentName, TopicProbabilityMix)
-      .withColumn(TopicProbabilityMixArray, topicDistributionToArray(documentToTopicDistributionDF(TopicProbabilityMix)))
-      .selectExpr(s"$DocumentName  AS $DocumentName", s"$TopicProbabilityMixArray AS $TopicProbabilityMix")
-
-    precisionUtility.castColumn(documentToTopicDistributionArray, TopicProbabilityMix)
-  }
-
-  case class SpotLDAInput(doc: String, word: String, count: Int) extends Serializable
-
-  case class SpotLDAOutput(docToTopicMix: DataFrame, wordResults: Map[String, Array[Double]])
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
index 6be11e1..4e09616 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/model/FlowSuspiciousConnectsModel.scala
@@ -20,11 +20,10 @@ package org.apache.spot.netflow.model
 import org.apache.log4j.Logger
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
 import org.apache.spot.netflow.FlowSchema._
 import org.apache.spot.netflow.FlowWordCreator
 import org.apache.spot.utilities.FloatPointPrecisionUtility
@@ -36,7 +35,7 @@ import org.apache.spot.utilities.data.validation.InvalidDataHandler
   * The model uses a topic-modelling approach that:
   * 1. Simplifies netflow records into words, one word at the source IP and another (possibly different) at the
   * destination IP.
-  * 2. The netflow words about each IP are treated as collections of thes words.
+  * 2. The netflow words about each IP are treated as collections of these words.
   * 3. A topic modelling approach is used to infer a collection of "topics" that represent common profiles
   * of network traffic. These "topics" are probability distributions on words.
   * 4. Each IP has a mix of topics corresponding to its behavior.
@@ -112,7 +111,7 @@ class FlowSuspiciousConnectsModel(topicCount: Int,
 }
 
 /**
-  * Contains dataframe schema information as well as the train-from-dataframe routine
+  * Contains DataFrame schema information as well as the train-from-dataframe routine
   * (which is a kind of factory routine) for [[FlowSuspiciousConnectsModel]] instances.
   *
   */
@@ -127,7 +126,7 @@ object FlowSuspiciousConnectsModel {
     IbytField,
     IpktField))
 
-  val ModelColumns = ModelSchema.fieldNames.toList.map(col)
+  val ModelColumns: List[Column] = ModelSchema.fieldNames.toList.map(col)
 
 
   def trainModel(sparkSession: SparkSession,
@@ -146,13 +145,12 @@ object FlowSuspiciousConnectsModel {
       config.duplicationFactor))
 
 
+    import sparkSession.implicits._
     // simplify netflow log entries into "words"
 
     val dataWithWords = totalRecords.withColumn(SourceWord, FlowWordCreator.srcWordUDF(ModelColumns: _*))
       .withColumn(DestinationWord, FlowWordCreator.dstWordUDF(ModelColumns: _*))
 
-    import sparkSession.implicits._
-
     // Aggregate per-word counts at each IP
     val srcWordCounts = dataWithWords
       .filter(dataWithWords(SourceWord).notEqual(InvalidDataHandler.WordError))
@@ -173,20 +171,19 @@ object FlowSuspiciousConnectsModel {
         .reduceByKey(_ + _)
         .map({ case ((ip, word), count) => SpotLDAInput(ip, word, count) })
 
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(ipWordCounts, config.precisionUtility, sparkSession)
 
-    val SpotLDAOutput(ipToTopicMix, wordToPerTopicProb) = SpotLDAWrapper.runLDA(sparkSession,
-      ipWordCounts,
-      config.topicCount,
+    val model = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
+
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
 
-    new FlowSuspiciousConnectsModel(config.topicCount,
-      ipToTopicMix,
-      wordToPerTopicProb)
+    new FlowSuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
index 3ef60af..7332fe4 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsModel.scala
@@ -25,9 +25,8 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.SuspiciousConnectsScoreFunction
-import org.apache.spot.lda.SpotLDAWrapper
-import org.apache.spot.lda.SpotLDAWrapper.{SpotLDAInput, SpotLDAOutput}
 import org.apache.spot.lda.SpotLDAWrapperSchema._
+import org.apache.spot.lda.{SpotLDAHelper, SpotLDAInput, SpotLDAResult, SpotLDAWrapper}
 import org.apache.spot.proxy.ProxySchema._
 import org.apache.spot.utilities._
 import org.apache.spot.utilities.data.validation.InvalidDataHandler
@@ -92,7 +91,7 @@ class ProxySuspiciousConnectsModel(topicCount: Int,
   */
 object ProxySuspiciousConnectsModel {
 
-  // These buckets are optimized to datasets used for training. Last bucket is of infinite size to ensure fit.
+  // These buckets are optimized to data sets used for training. Last bucket is of infinite size to ensure fit.
   // The maximum value of entropy is given by log k where k is the number of distinct categories.
   // Given that the alphabet and number of characters is finite the maximum value for entropy is upper bounded.
   // Bucket number and size can be changed to provide less/more granularity
@@ -119,8 +118,8 @@ object ProxySuspiciousConnectsModel {
     * for clustering in the topic model.
     *
     * @param sparkSession Spark Session
-    * @param logger       Logge object.
-    * @param config       SuspiciousConnetsArgumnetParser.Config object containg CLI arguments.
+    * @param logger       Logger object.
+    * @param config       SuspiciousConnectsArgumentParser.Config object containing CLI arguments.
     * @param inputRecords Dataframe for training data, with columns Host, Time, ReqMethod, FullURI, ResponseContentType,
     *                     UserAgent, RespCode (as defined in ProxySchema object).
     * @return ProxySuspiciousConnectsModel
@@ -130,7 +129,7 @@ object ProxySuspiciousConnectsModel {
                  config: SuspiciousConnectsConfig,
                  inputRecords: DataFrame): ProxySuspiciousConnectsModel = {
 
-    logger.info("training new proxy suspcious connects model")
+    logger.info("training new proxy suspicious connects model")
 
 
     val selectedRecords =
@@ -145,24 +144,24 @@ object ProxySuspiciousConnectsModel {
         .reduceByKey(_ + _).collect()
         .toMap
 
-    val agentToCountBC = sparkSession.sparkContext.broadcast(agentToCount)
-
     val docWordCount: RDD[SpotLDAInput] =
       getIPWordCounts(sparkSession, logger, selectedRecords, config.feedbackFile, config.duplicationFactor,
         agentToCount)
 
-    val SpotLDAOutput(ipToTopicMixDF, wordResults) = SpotLDAWrapper.runLDA(sparkSession,
-      docWordCount,
-      config.topicCount,
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(docWordCount, config.precisionUtility, sparkSession)
+
+    val model = SpotLDAWrapper.run(config.topicCount,
       logger,
       config.ldaPRGSeed,
       config.ldaAlpha,
       config.ldaBeta,
       config.ldaOptimizer,
       config.ldaMaxiterations,
-      config.precisionUtility)
+      spotLDAHelper)
+
+    val results: SpotLDAResult = model.predict(spotLDAHelper)
 
-    new ProxySuspiciousConnectsModel(config.topicCount, ipToTopicMixDF, wordResults)
+    new ProxySuspiciousConnectsModel(config.topicCount, results.documentToTopicMix, results.wordToTopicMix)
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
index 0183027..083cfe7 100644
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/utilities/TopDomains.scala
@@ -28,7 +28,6 @@ object TopDomains {
 
   val TopDomains: Set[String] = Source.fromFile(alexaTop1MPath).getLines.map(line => {
     val parts = line.split(",")
-    val l = parts.length
     parts(1).split('.')(0)
   }).toSet
 }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
new file mode 100644
index 0000000..93828b2
--- /dev/null
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAHelperTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spot.lda
+
+import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spot.lda.SpotLDAWrapperSchema.TopicProbabilityMix
+import org.apache.spot.testutils.TestingSparkContextFlatSpec
+import org.apache.spot.utilities.{FloatPointPrecisionUtility32, FloatPointPrecisionUtility64}
+import org.scalatest.Matchers
+
+/**
+  * Created by rabarona on 7/17/17.
+  */
+class SpotLDAHelperTest extends TestingSparkContextFlatSpec with Matchers {
+
+  "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
+    "is the docID, values are the vectors of word occurrences in that doc" in {
+
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkLDAInput: RDD[(Long, Vector)] = spotLDAHelper.formattedCorpus
+    val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
+
+    sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(5.0, 8.0))), (2, Vectors.sparse(4, Array
+    (1), Array(2.0))), (1, Vectors.sparse(4, Array(2), Array(4.0))))
+  }
+
+  "formatSparkLDADocTopicOutput" should "return RDD[(String,Array(Double))] after converting doc results from vector " +
+    "using PrecisionUtilityDouble: convert docID back to string, convert vector of probabilities to array" in {
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
+      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
+        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
+        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkDocRes: DataFrame = spotLDAHelper.formatDocumentDistribution(docTopicDist)
+
+    import testImplicits._
+    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Double]) => (documentName,
+      docProbabilities)
+    }).collect
+
+    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Double]]
+
+    documents should contain("192.168.1.1", Seq(0.15, 0.3, 0.5, 0.05))
+    documents should contain("10.10.98.123", Seq(0.25, 0.15, 0.4, 0.2))
+    documents should contain("66.23.45.11", Seq(0.4, 0.1, 0.3, 0.2))
+
+    documentProbabilities(0) shouldBe a[java.lang.Double]
+
+  }
+
+  it should "return RDD[(String,Array(Float))] after converting doc results from vector " +
+    "using PrecisionUtilityFloat: convert docID back to string, convert vector of probabilities to array" in {
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
+      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
+      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
+      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility32, sparkSession)
+
+    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
+      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
+        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
+        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
+
+    val sparkDocRes: DataFrame = spotLDAHelper.formatDocumentDistribution(docTopicDist)
+
+    import testImplicits._
+    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Float]) => (documentName,
+      docProbabilities)
+    }).collect
+
+    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Float]]
+
+    documents should contain("192.168.1.1", Seq(0.15f, 0.3f, 0.5f, 0.05f))
+    documents should contain("10.10.98.123", Seq(0.25f, 0.15f, 0.4f, 0.2f))
+    documents should contain("66.23.45.11", Seq(0.4f, 0.1f, 0.3f, 0.2f))
+
+    documentProbabilities(0) shouldBe a[java.lang.Float]
+  }
+
+  "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back " +
+    "to strings, and sequence of probabilities to array" in {
+
+    val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
+
+    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "23.0_7.0_7.0_4.0", 8),
+      SpotLDAInput("10.10.98.123", "80.0_7.0_7.0_4.0", 4),
+      SpotLDAInput("66.23.45.11", "333333.0_7.0_7.0_4.0", 2),
+      SpotLDAInput("192.168.1.2", "-1_23.0_7.0_7.0_4.0", 5)))
+
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(documentWordData, FloatPointPrecisionUtility64, sparkSession)
+
+    val sparkWordRes = spotLDAHelper.formatTopicDistributions(testMat)
+
+    sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
+    sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/45c03ab6/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
index 5c40068..7007ba1 100644
--- a/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/lda/SpotLDAWrapperTest.scala
@@ -18,25 +18,18 @@
 package org.apache.spot.lda
 
 import org.apache.log4j.{Level, LogManager}
-import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spot.lda.SpotLDAWrapper._
 import org.apache.spot.lda.SpotLDAWrapperSchema._
 import org.apache.spot.testutils.TestingSparkContextFlatSpec
 import org.apache.spot.utilities.{FloatPointPrecisionUtility32, FloatPointPrecisionUtility64}
 import org.scalatest.Matchers
 
-import scala.collection.immutable.Map
-
 class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
 
   "SparkLDA" should "handle an extremely unbalanced two word doc with EM optimizer" in {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -46,16 +39,20 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Double]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
@@ -65,9 +62,9 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.2
-    val ldaBeta = 1.001
-    val ldaMaxIterations = 20
+    val ldaAlpha = 1.002
+    val ldaBeta = 1.0001
+    val ldaMaxIterations = 100
 
     val optimizer = "em"
 
@@ -75,20 +72,24 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer ,ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
     val dogTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
     val catTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
@@ -98,7 +99,7 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  0.0009
+    val ldaAlpha = 0.0009
     val ldaBeta = 0.00001
     val ldaMaxIterations = 400
 
@@ -108,16 +109,20 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Double]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0) * catTopics(0) + topicMix(1) * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0) * dogTopics(0) + topicMix(1) * dogTopics(1))) should be < 0.01
@@ -127,7 +132,7 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  0.0009
+    val ldaAlpha = 0.0009
     val ldaBeta = 0.00001
     val ldaMaxIterations = 400
     val optimizer = "online"
@@ -136,20 +141,25 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility64)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility64, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
+
     val dogTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
     val catTopicMix: Array[Double] =
       topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first()
-        .toSeq(0).asInstanceOf[Seq[Double]].toArray
+        .toSeq.head.asInstanceOf[Seq[Double]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
@@ -159,7 +169,7 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -169,16 +179,20 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val dogWorld = SpotLDAInput("pets", "dog", 999)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility32, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
 
     val topicMix =
-      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "pets").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(topicMix(0).toDouble * catTopics(0) + topicMix(1).toDouble * catTopics(1)) should be < 0.01
     Math.abs(0.999 - (topicMix(0).toDouble * dogTopics(0) + topicMix(1).toDouble * dogTopics(1))) should be < 0.01
@@ -188,7 +202,7 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
     logger.setLevel(Level.WARN)
 
-    val ldaAlpha =  1.02
+    val ldaAlpha = 1.02
     val ldaBeta = 1.001
     val ldaMaxIterations = 20
 
@@ -198,134 +212,28 @@ class SpotLDAWrapperTest extends TestingSparkContextFlatSpec with Matchers {
     val dogWorld = SpotLDAInput("dog world", "dog", 1)
 
     val data = sparkSession.sparkContext.parallelize(Seq(catFancy, dogWorld))
-    val out = SpotLDAWrapper.runLDA(sparkSession, data, 2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
-      optimizer, ldaMaxIterations, FloatPointPrecisionUtility32)
 
-    val topicMixDF = out.docToTopicMix
+    val spotLDAHelper: SpotLDAHelper = SpotLDAHelper(data, FloatPointPrecisionUtility32, sparkSession)
+    val model: SpotLDAModel = SpotLDAWrapper.run(2, logger, Some(0xdeadbeef), ldaAlpha, ldaBeta,
+      optimizer, ldaMaxIterations, spotLDAHelper)
+
+    val results = model.predict(spotLDAHelper)
+
+    val topicMixDF = results.documentToTopicMix
+
     val dogTopicMix: Array[Float] =
-      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "dog world").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
 
     val catTopicMix: Array[Float] =
-      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq(0)
+      topicMixDF.filter(topicMixDF(DocumentName) === "cat fancy").select(TopicProbabilityMix).first().toSeq.head
         .asInstanceOf[Seq[Float]].toArray
 
-    val catTopics = out.wordResults("cat")
-    val dogTopics = out.wordResults("dog")
+    val catTopics = results.wordToTopicMix("cat")
+    val dogTopics = results.wordToTopicMix("dog")
 
     Math.abs(1 - (catTopicMix(0) * catTopics(0) + catTopicMix(1) * catTopics(1))) should be < 0.01
     Math.abs(1 - (dogTopicMix(0) * dogTopics(0) + dogTopicMix(1) * dogTopics(1))) should be < 0.01
   }
 
-  "formatSparkLDAInput" should "return input in RDD[(Long, Vector)] (collected as Array for testing) format. The index " +
-    "is the docID, values are the vectors of word occurrences in that doc" in {
-
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val wordDictionary = Map("333333_7.0_0.0_1.0" -> 0, "1111111_6.0_3.0_5.0" -> 1, "-1_43_7.0_2.0_6.0" -> 2,
-      "-1_80_6.0_1.0_1.0" -> 3)
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-
-    val sparkLDAInput: RDD[(Long, Vector)] = SpotLDAWrapper.formatSparkLDAInput(documentWordData,
-      documentDictionary, wordDictionary, sparkSession)
-    val sparkLDAInArr: Array[(Long, Vector)] = sparkLDAInput.collect()
-
-    sparkLDAInArr shouldBe Array((0, Vectors.sparse(4, Array(0, 3), Array(8.0, 5.0))), (2, Vectors.sparse(4, Array
-    (2), Array(2.0))), (1, Vectors.sparse(4, Array(1), Array(4.0))))
-  }
-
-  "formatSparkLDADocTopicOutput" should "return RDD[(String,Array(Double))] after converting doc results from vector " +
-    "using PrecisionUtilityDouble: convert docID back to string, convert vector of probabilities to array" in {
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
-      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
-        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
-        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
-
-    val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession,
-      FloatPointPrecisionUtility64)
-
-    import testImplicits._
-    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Double]) => (documentName,
-      docProbabilities)
-    }).collect
-
-    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Double]]
-
-    documents should contain("192.168.1.1", Seq(0.15, 0.3, 0.5, 0.05))
-    documents should contain("10.10.98.123", Seq(0.25, 0.15, 0.4, 0.2))
-    documents should contain("66.23.45.11", Seq(0.4, 0.1, 0.3, 0.2))
-
-    documentProbabilities(0) shouldBe a[java.lang.Double]
-
-  }
-
-  it should "return RDD[(String,Array(Float))] after converting doc results from vector " +
-    "using PrecisionUtilityFloat: convert docID back to string, convert vector of probabilities to array" in {
-
-    val documentWordData = sparkSession.sparkContext.parallelize(Seq(SpotLDAInput("192.168.1.1", "333333_7.0_0.0_1.0", 8),
-      SpotLDAInput("10.10.98.123", "1111111_6.0_3.0_5.0", 4),
-      SpotLDAInput("66.23.45.11", "-1_43_7.0_2.0_6.0", 2),
-      SpotLDAInput("192.168.1.1", "-1_80_6.0_1.0_1.0", 5)))
-
-    val documentDictionary: DataFrame = sparkSession.createDataFrame(documentWordData
-      .map({ case SpotLDAInput(doc, word, count) => doc })
-      .distinct
-      .zipWithIndex.map({ case (d, c) => Row(d, c) }), StructType(List(DocumentNameField, DocumentNumberField)))
-
-    val docTopicDist: RDD[(Long, Vector)] = sparkSession.sparkContext.parallelize(
-      Array((0.toLong, Vectors.dense(0.15, 0.3, 0.5, 0.05)),
-        (1.toLong, Vectors.dense(0.25, 0.15, 0.4, 0.2)),
-        (2.toLong, Vectors.dense(0.4, 0.1, 0.3, 0.2))))
-
-    val sparkDocRes: DataFrame = formatSparkLDADocTopicOutput(docTopicDist, documentDictionary, sparkSession,
-      FloatPointPrecisionUtility32)
-
-    import testImplicits._
-    val documents = sparkDocRes.map({ case Row(documentName: String, docProbabilities: Seq[Float]) => (documentName,
-      docProbabilities)
-    }).collect
-
-    val documentProbabilities = sparkDocRes.select(TopicProbabilityMix).first.toSeq(0).asInstanceOf[Seq[Float]]
-
-    documents should contain("192.168.1.1", Seq(0.15f, 0.3f, 0.5f, 0.05f))
-    documents should contain("10.10.98.123", Seq(0.25f, 0.15f, 0.4f, 0.2f))
-    documents should contain("66.23.45.11", Seq(0.4f, 0.1f, 0.3f, 0.2f))
-
-    documentProbabilities(0) shouldBe a[java.lang.Float]
-  }
-
-  "formatSparkLDAWordOutput" should "return Map[Int,String] after converting word matrix to sequence, wordIDs back " +
-    "to strings, and sequence of probabilities to array" in {
-    val testMat = Matrices.dense(4, 4, Array(0.5, 0.2, 0.05, 0.25, 0.25, 0.1, 0.15, 0.5, 0.1, 0.4, 0.25, 0.25, 0.7, 0.2, 0.02, 0.08))
-
-    val wordDictionary = Map("-1_23.0_7.0_7.0_4.0" -> 3, "23.0_7.0_7.0_4.0" -> 0, "333333.0_7.0_7.0_4.0" -> 2, "80.0_7.0_7.0_4.0" -> 1)
-    val revWordMap: Map[Int, String] = wordDictionary.map(_.swap)
-
-    val sparkWordRes = formatSparkLDAWordOutput(testMat, revWordMap)
-
-    sparkWordRes should contain key ("23.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("80.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("333333.0_7.0_7.0_4.0")
-    sparkWordRes should contain key ("-1_23.0_7.0_7.0_4.0")
-  }
 }
\ No newline at end of file