You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by ev...@apache.org on 2017/01/25 18:36:45 UTC

[33/49] incubator-spot git commit: unit_test_cleanup

unit_test_cleanup

 CODE BASE IS NOT FUNCTIONAL AT THIS COMMIT

restructuring code for easier merge of data validation code with unit test changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/c638e2fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/c638e2fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/c638e2fa

Branch: refs/heads/master
Commit: c638e2fac593dcd91d900f5b350959a8bc0d49b2
Parents: 986cebf
Author: nlsegerl <na...@intel.com>
Authored: Wed Jan 4 11:31:19 2017 -0800
Committer: nlsegerl <na...@intel.com>
Committed: Wed Jan 4 11:31:19 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spot/SuspiciousConnects.scala    |  20 ++-
 .../dns/DNSSuspiciousConnectsAnalysis.scala     | 169 +++++++++++++++----
 .../FlowSuspiciousConnectsAnalysis.scala        | 116 ++++++++++---
 .../proxy/ProxySuspiciousConnectsAnalysis.scala | 150 ++++++++++++++--
 4 files changed, 387 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/c638e2fa/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
index 8751189..dfd5b8f 100644
--- a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
@@ -7,7 +7,7 @@ import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSuspiciousConnectsAnalysis
 import org.apache.spot.netflow.FlowSuspiciousConnectsAnalysis
 import org.apache.spot.proxy.ProxySuspiciousConnectsAnalysis
-
+import org.apache.spot.utilities.data.InputOutputDataHandler
 
 /**
   * Top level entrypoint to execute suspicious connections analysis on network data.
@@ -43,13 +43,25 @@ object SuspiciousConnects {
         val sqlContext = new SQLContext(sparkContext)
         implicit val outputDelimiter = config.outputDelimiter
 
+        val inputDataFrame = InputOutputDataHandler.getInputDataFrame(sqlContext, config.inputPath, logger)
+          .getOrElse(sqlContext.emptyDataFrame)
+        if(inputDataFrame.rdd.isEmpty()) {
+          logger.error("Couldn't read data from location " + config.inputPath +", please verify it's a valid location and that " +
+            s"contains parquet files with a given schema and try again.")
+          System.exit(0)
+        }
+
+
+
         analysis match {
-          case "flow" => FlowSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger)
-          case "dns" => DNSSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger)
-          case "proxy" => ProxySuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger)
+          case "flow" => FlowSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger, inputDataFrame)
+          case "dns" => DNSSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger, inputDataFrame)
+          case "proxy" => ProxySuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger, inputDataFrame)
           case _ => logger.error("Unsupported (or misspelled) analysis: " + analysis)
         }
 
+        InputOutputDataHandler.mergeResultsFiles(sparkContext, config.hdfsScoredConnect, analysis, logger)
+
         sparkContext.stop()
 
       case None => logger.error("Error parsing arguments")

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/c638e2fa/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
index 244b941..5db5c50 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
@@ -8,7 +8,11 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
 import org.apache.spot.dns.model.DNSSuspiciousConnectsModel
+import org.apache.log4j.Logger
 import org.apache.spot.dns.model.DNSSuspiciousConnectsModel.ModelSchema
+import org.apache.spot.proxy.ProxySchema.Score
+import org.apache.spot.utilities.data.validation.{InvalidDataHandler => dataValidation}
+
 
 /**
   * The suspicious connections analysis of DNS log data develops a probabilistic model the DNS queries
@@ -17,27 +21,6 @@ import org.apache.spot.dns.model.DNSSuspiciousConnectsModel.ModelSchema
 
 object DNSSuspiciousConnectsAnalysis {
 
-  val inSchema = StructType(List(TimestampField, UnixTimestampField, FrameLengthField, ClientIPField,
-      QueryNameField, QueryClassField, QueryTypeField, QueryResponseCodeField))
-
-  val inColumns = inSchema.fieldNames.map(col)
-
-
-  assert(ModelSchema.fields.forall(inSchema.fields.contains(_)))
-
-  val OutSchema = StructType(
-    List(TimestampField,
-      UnixTimestampField,
-      FrameLengthField,
-      ClientIPField,
-      QueryNameField,
-      QueryClassField,
-      QueryTypeField,
-      QueryResponseCodeField,
-      ScoreField))
-
-  val OutColumns = OutSchema.fieldNames.map(col)
-
 
   /**
     * Run suspicious connections analysis on DNS log data.
@@ -48,31 +31,37 @@ object DNSSuspiciousConnectsAnalysis {
     * @param sqlContext
     * @param logger
     */
-  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger) = {
-    logger.info("Starting DNS suspicious connects analysis.")
+  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+          inputDNSRecords: DataFrame) = {
 
 
-    logger.info("Loading data")
+    logger.info("Starting DNS suspicious connects analysis.")
 
+    val userDomain = config.userDomain
 
-    val rawDataDF = sqlContext.read.parquet(config.inputPath)
-      .filter(Timestamp + " is not null and " + UnixTimestamp + " is not null")
-      .select(inColumns:_*)
+    val cleanDNSRecords = filterAndSelectCleanDNSRecords(inputDNSRecords)
 
+    logger.info("Training the model")
 
+    val scoredDNSRecords = detectDNSAnomalies(cleanDNSRecords, config, sparkContext, sqlContext, logger)
 
-    val scoredDF = detectDNSAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
+    val filteredDNSRecords = filterScoredDNSRecords(scoredDNSRecords, config.threshold)
 
+    val orderedDNSRecords = filteredDNSRecords.orderBy(Score)
 
+    val mostSuspiciousDNSRecords = if(config.maxResults > 0)  orderedDNSRecords.limit(config.maxResults) else orderedDNSRecords
 
-    val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
-    val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
+    val outputDNSRecords = mostSuspiciousDNSRecords.select(OutSchema:_*).sort(Score)
 
-    mostSusipiciousDF.select(OutColumns:_*).sort(Score)
+    logger.info("DNS  suspicious connects analysis completed.")
     logger.info("Saving results to : " + config.hdfsScoredConnect)
+    outputDNSRecords.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
 
+    val invalidDNSRecords = filterAndSelectInvalidDNSRecords(inputDNSRecords)
+    dataValidation.showAndSaveInvalidRecords(invalidDNSRecords, config.hdfsScoredConnect, logger)
 
-    mostSusipiciousDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+    val corruptDNSRecords = filterAndSelectCorruptDNSRecords(scoredDNSRecords)
+    dataValidation.showAndSaveCorruptRecords(corruptDNSRecords, config.hdfsScoredConnect, logger)
   }
 
   /**
@@ -98,4 +87,120 @@ object DNSSuspiciousConnectsAnalysis {
     logger.info("Identifying outliers")
     model.score(sparkContext, sqlContext, data, userDomain)
   }
+
+  /**
+    *
+    * @param inputDNSRecords raw DNS records.
+    * @return
+    */
+  def filterAndSelectCleanDNSRecords(inputDNSRecords: DataFrame): DataFrame ={
+
+    val cleanDNSRecordsFilter = inputDNSRecords(Timestamp).isNotNull &&
+      inputDNSRecords(Timestamp).notEqual("") &&
+      inputDNSRecords(Timestamp).notEqual("-") &&
+      inputDNSRecords(UnixTimestamp).isNotNull &&
+      inputDNSRecords(FrameLength).isNotNull &&
+      inputDNSRecords(QueryName).isNotNull &&
+      inputDNSRecords(QueryName).notEqual("") &&
+      inputDNSRecords(QueryName).notEqual("-") &&
+      inputDNSRecords(QueryName).notEqual("(empty)") &&
+      inputDNSRecords(ClientIP).isNotNull &&
+      inputDNSRecords(ClientIP).notEqual("") &&
+      inputDNSRecords(ClientIP).notEqual("-") &&
+      ((inputDNSRecords(QueryClass).isNotNull &&
+        inputDNSRecords(QueryClass).notEqual("") &&
+        inputDNSRecords(QueryClass).notEqual("-")) ||
+        inputDNSRecords(QueryType).isNotNull ||
+        inputDNSRecords(QueryResponseCode).isNotNull)
+
+    inputDNSRecords
+      .filter(cleanDNSRecordsFilter)
+      .select(InSchema: _*)
+      .na.fill(DefaultQueryClass, Seq(QueryClass))
+      .na.fill(DefaultQueryType, Seq(QueryType))
+      .na.fill(DefaultQueryResponseCode, Seq(QueryResponseCode))
+  }
+
+  /**
+    *
+    * @param inputDNSRecords raw DNS records.
+    * @return
+    */
+  def filterAndSelectInvalidDNSRecords(inputDNSRecords: DataFrame): DataFrame ={
+
+    val invalidDNSRecordsFilter = inputDNSRecords(Timestamp).isNull ||
+      inputDNSRecords(Timestamp).equalTo("") ||
+      inputDNSRecords(Timestamp).equalTo("-") ||
+      inputDNSRecords(UnixTimestamp).isNull ||
+      inputDNSRecords(FrameLength).isNull ||
+      inputDNSRecords(QueryName).isNull ||
+      inputDNSRecords(QueryName).equalTo("") ||
+      inputDNSRecords(QueryName).equalTo("-") ||
+      inputDNSRecords(QueryName).equalTo("(empty)") ||
+      inputDNSRecords(ClientIP).isNull ||
+      inputDNSRecords(ClientIP).equalTo("") ||
+      inputDNSRecords(ClientIP).equalTo("-") ||
+      ((inputDNSRecords(QueryClass).isNull ||
+        inputDNSRecords(QueryClass).equalTo("") ||
+        inputDNSRecords(QueryClass).equalTo("-")) &&
+        inputDNSRecords(QueryType).isNull &&
+        inputDNSRecords(QueryResponseCode).isNull)
+
+    inputDNSRecords
+      .filter(invalidDNSRecordsFilter)
+      .select(InSchema: _*)
+  }
+
+  /**
+    *
+    * @param scoredDNSRecords scored DNS records.
+    * @param threshold score tolerance.
+    * @return
+    */
+  def filterScoredDNSRecords(scoredDNSRecords: DataFrame, threshold: Double): DataFrame ={
+
+    val filteredDNSRecordsFilter = scoredDNSRecords(Score).leq(threshold) &&
+      scoredDNSRecords(Score).gt(dataValidation.ScoreError)
+
+    scoredDNSRecords.filter(filteredDNSRecordsFilter)
+  }
+
+  /**
+    *
+    * @param scoredDNSRecords scored DNS records.
+    * @return
+    */
+  def filterAndSelectCorruptDNSRecords(scoredDNSRecords: DataFrame): DataFrame = {
+
+    val corruptDNSRecordsFilter = scoredDNSRecords(Score).equalTo(dataValidation.ScoreError)
+
+    scoredDNSRecords
+      .filter(corruptDNSRecordsFilter)
+      .select(OutSchema: _*)
+
+  }
+
+
+  val DefaultQueryClass = "unknown"
+  val DefaultQueryType = -1
+  val DefaultQueryResponseCode = -1
+
+  val InStructType = StructType(List(TimestampField, UnixTimestampField, FrameLengthField, ClientIPField,
+    QueryNameField, QueryClassField, QueryTypeField, QueryResponseCodeField))
+
+  val InSchema = InStructType.fieldNames.map(col)
+
+  assert(ModelSchema.fields.forall(InStructType.fields.contains(_)))
+
+  val OutSchema = StructType(
+    List(TimestampField,
+      UnixTimestampField,
+      FrameLengthField,
+      ClientIPField,
+      QueryNameField,
+      QueryClassField,
+      QueryTypeField,
+      QueryResponseCodeField,
+      ScoreField)).fieldNames.map(col)
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/c638e2fa/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
index 098a787..127b2a7 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
@@ -8,6 +8,7 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.netflow.FlowSchema._
 import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel
+import org.apache.spot.utilities.data.validation.{InvalidDataHandler => dataValidation}
 
 
 /**
@@ -17,29 +18,33 @@ import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel
 
 object FlowSuspiciousConnectsAnalysis {
 
-  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger) = {
+  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+          inputFlowRecords: DataFrame) = {
 
-    logger.info("Loading data")
+    logger.info("Starting flow suspicious connects analysis.")
 
-    val rawDataDF = sqlContext.read.parquet(config.inputPath)
-      .filter(Hour + " BETWEEN 0 AND 23 AND  " + Minute + " BETWEEN 0 AND 59 AND  " + Second + " BETWEEN 0 AND 59")
-      .select(inColumns: _*)
+    val cleanFlowRecords = filterAndSelectCleanFlowRecords(inputFlowRecords)
 
+    val scoredFlowRecords = detectFlowAnomalies(cleanFlowRecords, config, sparkContext, sqlContext, logger)
 
-    logger.info("Training the model")
+    val filteredFlowRecords = filterScoredFlowRecords(scoredFlowRecords, config.threshold)
 
-    val scoredDF = detectFlowAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
+    val orderedFlowRecords = filteredFlowRecords.orderBy(Score)
 
-    val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
+    val mostSuspiciousFlowRecords =
+      if(config.maxResults > 0 ) orderedFlowRecords.limit(config.maxResults) else orderedFlowRecords
 
-    val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
-
-
-    val outputDF = mostSusipiciousDF.select(OutColumns: _*)
+    val outputFlowRecords = mostSuspiciousFlowRecords.select(OutSchema: _*)
 
     logger.info("Netflow  suspicious connects analysis completed.")
     logger.info("Saving results to : " + config.hdfsScoredConnect)
-    outputDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+    outputFlowRecords.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+
+    val invalidFlowRecords = filterAndSelectInvalidFlowRecords(inputFlowRecords)
+    dataValidation.showAndSaveInvalidRecords(invalidFlowRecords, config.hdfsScoredConnect, logger)
+
+    val corruptFlowRecords = filterAndSelectCorruptFlowRecords(scoredFlowRecords)
+    dataValidation.showAndSaveCorruptRecords(corruptFlowRecords, config.hdfsScoredConnect, logger)
   }
 
   /**
@@ -67,7 +72,83 @@ object FlowSuspiciousConnectsAnalysis {
     model.score(sparkContext, sqlContext, data)
   }
 
-  val inSchema = StructType(List(TimeReceivedField,
+  /**
+    *
+    * @param inputFlowRecords raw flow records
+    * @return
+    */
+  def filterAndSelectCleanFlowRecords(inputFlowRecords: DataFrame): DataFrame ={
+
+    val cleanFlowRecordsFilter = inputFlowRecords(Hour).between(0, 23) &&
+      inputFlowRecords(Minute).between(0, 59) &&
+      inputFlowRecords(Second).between(0, 59) &&
+      inputFlowRecords(TimeReceived).isNotNull &&
+      inputFlowRecords(SourceIP).isNotNull &&
+      inputFlowRecords(DestinationIP).isNotNull &&
+      inputFlowRecords(SourcePort).isNotNull &&
+      inputFlowRecords(DestinationPort).isNotNull &&
+      inputFlowRecords(Ibyt).isNotNull &&
+      inputFlowRecords(Ipkt).isNotNull
+
+    inputFlowRecords
+      .filter(cleanFlowRecordsFilter)
+      .select(InSchema: _*)
+
+  }
+
+  /**
+    *
+    * @param inputFlowRecords raw flow records.
+    * @return
+    */
+  def filterAndSelectInvalidFlowRecords(inputFlowRecords: DataFrame): DataFrame = {
+
+    val invalidFlowRecordsFilter = inputFlowRecords(Hour).between(0,23) &&
+      inputFlowRecords(Minute).between(0,59) &&
+      inputFlowRecords(Second).between(0,59) &&
+      inputFlowRecords(TimeReceived).isNull ||
+      inputFlowRecords(SourceIP).isNull ||
+      inputFlowRecords(DestinationIP).isNull ||
+      inputFlowRecords(SourcePort).isNull ||
+      inputFlowRecords(DestinationPort).isNull ||
+      inputFlowRecords(Ibyt).isNull ||
+      inputFlowRecords(Ipkt).isNull
+
+    inputFlowRecords
+      .filter(invalidFlowRecordsFilter)
+      .select(InSchema: _*)
+  }
+
+  /**
+    *
+    * @param scoredFlowRecords scored flow records.
+    * @param threshold score tolerance.
+    * @return
+    */
+  def filterScoredFlowRecords(scoredFlowRecords: DataFrame, threshold: Double): DataFrame = {
+
+    val filteredFlowRecordsFilter = scoredFlowRecords(Score).leq(threshold) &&
+      scoredFlowRecords(Score).gt(dataValidation.ScoreError)
+
+    scoredFlowRecords.filter(filteredFlowRecordsFilter)
+  }
+
+  /**
+    *
+    * @param scoredFlowRecords scored flow records.
+    * @return
+    */
+  def filterAndSelectCorruptFlowRecords(scoredFlowRecords: DataFrame): DataFrame = {
+
+    val corruptFlowRecordsFilter = scoredFlowRecords(Score).equalTo(dataValidation.ScoreError)
+
+    scoredFlowRecords
+      .filter(corruptFlowRecordsFilter)
+      .select(OutSchema: _*)
+
+  }
+
+  val InSchema = StructType(List(TimeReceivedField,
     YearField,
     MonthField,
     DayField,
@@ -83,9 +164,7 @@ object FlowSuspiciousConnectsAnalysis {
     IpktField,
     IbytField,
     OpktField,
-    ObytField))
-
-  val inColumns = inSchema.fieldNames.map(col)
+    ObytField)).fieldNames.map(col)
 
   val OutSchema = StructType(
     List(TimeReceivedField,
@@ -105,7 +184,6 @@ object FlowSuspiciousConnectsAnalysis {
       IbytField,
       OpktField,
       ObytField,
-      ScoreField))
+      ScoreField)).fieldNames.map(col)
 
-  val OutColumns = OutSchema.fieldNames.map(col)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/c638e2fa/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
index 38150ca..290f101 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
@@ -2,10 +2,12 @@ package org.apache.spot.proxy
 
 import org.apache.log4j.Logger
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.proxy.ProxySchema._
-
+import org.apache.spot.utilities.data.validation.{InvalidDataHandler => dataValidation}
 /**
   * Run suspicious connections analysis on proxy data.
   */
@@ -19,27 +21,35 @@ object ProxySuspiciousConnectsAnalysis {
     * @param sqlContext   Spark SQL context.
     * @param logger       Logs execution progress, information and errors for user.
     */
-  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger) = {
+  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+          inputProxyRecords: DataFrame) = {
 
     logger.info("Starting proxy suspicious connects analysis.")
 
-    logger.info("Loading data from: " + config.inputPath)
+    val cleanProxyRecords = filterAndSelectCleanProxyRecords(inputProxyRecords)
+
+
+    val scoredProxyRecords = detectProxyAnomalies(cleanProxyRecords, config, sparkContext, sqlContext, logger)
+
+    // take the maxResults least probable events of probability below the threshold and sort
+
+    val filteredProxyRecords = filterScoredProxyRecords(scoredProxyRecords, config.threshold)
 
-    val rawDataDF = sqlContext.read.parquet(config.inputPath).
-      filter(Date + " is not null and " + Time + " is not null and " + ClientIP + " is not null").
-      select(Date, Time, ClientIP, Host, ReqMethod, UserAgent, ResponseContentType, Duration, UserName,
-        WebCat, Referer, RespCode, URIPort, URIPath, URIQuery, ServerIP, SCBytes, CSBytes, FullURI)
+    val orderedProxyRecords = filteredProxyRecords.orderBy(Score)
 
-    val scoredDF = detectProxyAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
+    val mostSuspiciousProxyRecords = if(config.maxResults > 0)  orderedProxyRecords.limit(config.maxResults) else orderedProxyRecords
 
+    val outputProxyRecords = mostSuspiciousProxyRecords.select(OutSchema:_*)
 
-    val filteredDF = scoredDF.filter(Score +  " <= " + config.threshold)
-    val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
+    logger.info("Proxy suspicious connects analysis completed")
+    logger.info("Saving results to: " + config.hdfsScoredConnect)
+    outputProxyRecords.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
 
-    logger.info("Persisting data to hdfs: " + config.hdfsScoredConnect)
-    mostSusipiciousDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+    val invalidProxyRecords = filterAndSelectInvalidProxyRecords(inputProxyRecords)
+    dataValidation.showAndSaveInvalidRecords(invalidProxyRecords, config.hdfsScoredConnect, logger)
 
-    logger.info("Proxy suspcicious connects completed")
+    val corruptProxyRecords = filterAndSelectCorruptProxyRecords(scoredProxyRecords)
+    dataValidation.showAndSaveCorruptRecords(corruptProxyRecords, config.hdfsScoredConnect, logger)
   }
 
 
@@ -66,4 +76,118 @@ object ProxySuspiciousConnectsAnalysis {
 
     model.score(sparkContext, data)
   }
+
+  /**
+    *
+    * @param inputProxyRecords raw proxy records.
+    * @return
+    */
+  def filterAndSelectCleanProxyRecords(inputProxyRecords: DataFrame): DataFrame ={
+
+    val cleanProxyRecordsFilter =  inputProxyRecords(Date).isNotNull &&
+      inputProxyRecords(Time).isNotNull &&
+      inputProxyRecords(ClientIP).isNotNull &&
+      inputProxyRecords(Host).isNotNull &&
+      inputProxyRecords(FullURI).isNotNull
+
+    inputProxyRecords
+      .filter(cleanProxyRecordsFilter)
+      .select(InSchema:_*)
+      .na.fill(DefaultUserAgent, Seq(UserAgent))
+      .na.fill(DefaultResponseContentType, Seq(ResponseContentType))
+  }
+
+  /**
+    *
+    * @param inputProxyRecords raw proxy records.
+    * @return
+    */
+  def filterAndSelectInvalidProxyRecords(inputProxyRecords: DataFrame): DataFrame ={
+
+    val invalidProxyRecordsFilter = inputProxyRecords(Date).isNull ||
+      inputProxyRecords(Time).isNull ||
+      inputProxyRecords(ClientIP).isNull ||
+      inputProxyRecords(Host).isNull ||
+      inputProxyRecords(FullURI).isNull
+
+    inputProxyRecords
+      .filter(invalidProxyRecordsFilter)
+      .select(InSchema: _*)
+  }
+
+  /**
+    *
+    * @param scoredProxyRecords scored proxy records.
+    * @param threshold score tolerance.
+    * @return
+    */
+  def filterScoredProxyRecords(scoredProxyRecords: DataFrame, threshold: Double): DataFrame ={
+
+    val filteredProxyRecordsFilter = scoredProxyRecords(Score).leq(threshold) &&
+      scoredProxyRecords(Score).gt(dataValidation.ScoreError)
+
+    scoredProxyRecords.filter(filteredProxyRecordsFilter)
+  }
+
+  /**
+    *
+    * @param scoredProxyRecords scored proxy records.
+    * @return
+    */
+  def filterAndSelectCorruptProxyRecords(scoredProxyRecords: DataFrame): DataFrame ={
+
+    val corruptProxyRecordsFilter = scoredProxyRecords(Score).equalTo(dataValidation.ScoreError)
+
+    scoredProxyRecords
+      .filter(corruptProxyRecordsFilter)
+      .select(OutSchema: _*)
+  }
+
+  val DefaultUserAgent = "-"
+  val DefaultResponseContentType = "-"
+
+  val InSchema = StructType(
+    List(DateField,
+      TimeField,
+      ClientIPField,
+      HostField,
+      ReqMethodField,
+      UserAgentField,
+      ResponseContentTypeField,
+      DurationField,
+      UserNameField,
+      WebCatField,
+      RefererField,
+      RespCodeField,
+      URIPortField,
+      URIPathField,
+      URIQueryField,
+      ServerIPField,
+      SCBytesField,
+      CSBytesField,
+      FullURIField)).fieldNames.map(col)
+
+  val OutSchema = StructType(
+    List(DateField,
+      TimeField,
+      ClientIPField,
+      HostField,
+      ReqMethodField,
+      UserAgentField,
+      ResponseContentTypeField,
+      DurationField,
+      UserNameField,
+      WebCatField,
+      RefererField,
+      RespCodeField,
+      URIPortField,
+      URIPathField,
+      URIQueryField,
+      ServerIPField,
+      SCBytesField,
+      CSBytesField,
+      FullURIField,
+      WordField,
+      ScoreField)).fieldNames.map(col)
+
 }
\ No newline at end of file