You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by ev...@apache.org on 2017/01/25 18:36:23 UTC

[11/49] incubator-spot git commit: test_dns_topdomain

test_dns_topdomain

some unit testing necessitated refactoring


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/0f1a6c5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/0f1a6c5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/0f1a6c5e

Branch: refs/heads/master
Commit: 0f1a6c5e3c35247afce9c4df7da548f274d2b015
Parents: 83d157a
Author: nlsegerl <na...@intel.com>
Authored: Mon Dec 12 14:19:56 2016 -0800
Committer: nlsegerl <na...@intel.com>
Committed: Mon Dec 12 14:19:56 2016 -0800

----------------------------------------------------------------------
 .../dns/DNSSuspiciousConnectsAnalysis.scala     | 42 ++++++++++----
 .../FlowSuspiciousConnectsAnalysis.scala        | 33 +++++++++--
 .../proxy/ProxySuspiciousConnectsAnalysis.scala | 44 ++++++++++-----
 .../apache/spot/utilities/DataFrameUtils.scala  | 39 -------------
 .../org/apache/spot/DNSWordCreationTest.scala   |  4 +-
 .../org/apache/spot/FlowWordCreatorTest.scala   |  2 +-
 .../dns/DNSSuspiciousConnectsAnalysisTest.scala | 59 ++++++++++++++++++++
 7 files changed, 149 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
index d0e6da1..f2ce7a4 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
@@ -1,5 +1,6 @@
 package org.apache.spot.dns
 
+import org.apache.log4j.Logger
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
@@ -7,8 +8,6 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
 import org.apache.spot.dns.model.DNSSuspiciousConnectsModel
-import org.apache.log4j.Logger
-
 import org.apache.spot.dns.model.DNSSuspiciousConnectsModel.ModelSchema
 
 /**
@@ -42,6 +41,7 @@ object DNSSuspiciousConnectsAnalysis {
 
   /**
     * Run suspicious connections analysis on DNS log data.
+    * Saves the most suspicious connections to a CSV file on HDFS.
     *
     * @param config Object encapsulating runtime parameters and CLI options.
     * @param sparkContext
@@ -58,22 +58,42 @@ object DNSSuspiciousConnectsAnalysis {
       .filter(Timestamp + " is not null and " + UnixTimestamp + " is not null")
       .select(inColumns:_*)
 
-    logger.info("Training the model")
 
-    val model =
-      DNSSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, rawDataDF, config.topicCount)
 
-    logger.info("Scoring")
-    val scoredDF = model.score(sparkContext, sqlContext, rawDataDF)
+    val scoredDF = detectDNSAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
 
 
     val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
     val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
 
-    val outputDF = mostSusipiciousDF.select(OutColumns:_*).sort(Score)
-
-    logger.info("DNS  suspcicious connects analysis completed.")
+    mostSusipiciousDF.select(OutColumns:_*).sort(Score)
     logger.info("Saving results to : " + config.hdfsScoredConnect)
-    outputDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+
+
+    mostSusipiciousDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+  }
+
+  /**
+    * Identify anomalous DNS log entries in in the provided data frame.
+    *
+    * @param data Data frame of DNS entries
+    * @param config
+    * @param sparkContext
+    * @param sqlContext
+    * @param logger
+    * @return
+    */
+  def detectDNSAnomalies(data: DataFrame, config: SuspiciousConnectsConfig,
+                         sparkContext: SparkContext,
+                         sqlContext: SQLContext,
+                         logger: Logger) : DataFrame = {
+
+
+    logger.info("Fitting probabilistic model to data")
+    val model =
+      DNSSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, data, config.topicCount)
+
+    logger.info("Identifying outliers")
+    model.score(sparkContext, sqlContext, data)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
index 32c0f6e..098a787 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
@@ -17,7 +17,7 @@ import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel
 
 object FlowSuspiciousConnectsAnalysis {
 
-  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger)(implicit outputDelimiter: String) = {
+  def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger) = {
 
     logger.info("Loading data")
 
@@ -28,11 +28,7 @@ object FlowSuspiciousConnectsAnalysis {
 
     logger.info("Training the model")
 
-    val model =
-      FlowSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, rawDataDF, config.topicCount)
-
-    logger.info("Scoring")
-    val scoredDF = model.score(sparkContext, sqlContext, rawDataDF)
+    val scoredDF = detectFlowAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
 
     val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
 
@@ -46,6 +42,31 @@ object FlowSuspiciousConnectsAnalysis {
     outputDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
   }
 
+  /**
+    * Identify anomalous netflow log entries in in the provided data frame.
+    *
+    * @param data Data frame of netflow entries
+    * @param config
+    * @param sparkContext
+    * @param sqlContext
+    * @param logger
+    * @return
+    */
+  def detectFlowAnomalies(data: DataFrame,
+                          config: SuspiciousConnectsConfig,
+                         sparkContext: SparkContext,
+                         sqlContext: SQLContext,
+                         logger: Logger) : DataFrame = {
+
+
+    logger.info("Fitting probabilistic model to data")
+    val model =
+      FlowSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, data, config.topicCount)
+
+    logger.info("Identifying outliers")
+    model.score(sparkContext, sqlContext, data)
+  }
+
   val inSchema = StructType(List(TimeReceivedField,
     YearField,
     MonthField,

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
index 1131406..38150ca 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
@@ -2,10 +2,9 @@ package org.apache.spot.proxy
 
 import org.apache.log4j.Logger
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.proxy.ProxySchema._
-import org.apache.spot.utilities.DataFrameUtils
 
 /**
   * Run suspicious connections analysis on proxy data.
@@ -31,23 +30,40 @@ object ProxySuspiciousConnectsAnalysis {
       select(Date, Time, ClientIP, Host, ReqMethod, UserAgent, ResponseContentType, Duration, UserName,
         WebCat, Referer, RespCode, URIPort, URIPath, URIQuery, ServerIP, SCBytes, CSBytes, FullURI)
 
-    logger.info("Training the model")
-    val model =
-      ProxySuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, rawDataDF)
+    val scoredDF = detectProxyAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
 
-    logger.info("Scoring")
-    val scoredDF = model.score(sparkContext, rawDataDF)
-
-    // take the maxResults least probable events of probability below the threshold and sort
 
     val filteredDF = scoredDF.filter(Score +  " <= " + config.threshold)
-    val topRows = DataFrameUtils.dfTakeOrdered(filteredDF, "score", config.maxResults)
-    val scoreIndex = scoredDF.schema.fieldNames.indexOf("score")
-    val outputRDD = sparkContext.parallelize(topRows).sortBy(row => row.getDouble(scoreIndex))
+    val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
 
-    logger.info("Persisting data")
-    outputRDD.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+    logger.info("Persisting data to hdfs: " + config.hdfsScoredConnect)
+    mostSusipiciousDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
 
     logger.info("Proxy suspcicious connects completed")
   }
+
+
+  /**
+    * Identify anomalous proxy log entries in in the provided data frame.
+    *
+    * @param data Data frame of proxy entries
+    * @param config
+    * @param sparkContext
+    * @param sqlContext
+    * @param logger
+    * @return
+    */
+  def detectProxyAnomalies(data: DataFrame,
+                          config: SuspiciousConnectsConfig,
+                          sparkContext: SparkContext,
+                          sqlContext: SQLContext,
+                          logger: Logger) : DataFrame = {
+
+
+    logger.info("Fitting probabilistic model to data")
+    val model = ProxySuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, data)
+    logger.info("Identifying outliers")
+
+    model.score(sparkContext, data)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/main/scala/org/apache/spot/utilities/DataFrameUtils.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/DataFrameUtils.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/DataFrameUtils.scala
deleted file mode 100644
index d5af6ee..0000000
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/DataFrameUtils.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.spot.utilities
-
-import org.apache.spark.sql.{DataFrame, Row}
-
-/**
-  * Some handy operations on dataframes not provided by Apache Spark.
-  */
-object  DataFrameUtils {
-
-  /**
-    * Returns the rows of a dataframe whose values in a provided column are in the first k
-    * (least to greatest) values.  If strictly fewer than k rows are in the dataframe, all rows are returned.
-    *
-    * Dataframe analog to takeOrdered.
-    *
-    * @param df Input dataframe.
-    * @param colName Column to consider.
-    * @param k Maximum number of rows to return.
-    * @return Array of (at most k) rows.
-    */
-  def dfTakeOrdered(df: DataFrame, colName: String, k: Int) : Array[Row] = {
-    val count = df.count
-
-    val takeCount  = if (k == -1 || count < k) {
-      count.toInt
-    } else {
-      k
-    }
-
-    val colIndex = df.schema.fieldNames.indexOf(colName)
-
-    class DataOrdering() extends Ordering[Row] {
-      def compare(row1: Row, row2: Row) = row1.getDouble(colIndex).compare(row2.getDouble(colIndex))
-    }
-
-    implicit val rowOrdering = new DataOrdering()
-    df.rdd.takeOrdered(takeCount)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala b/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala
index 1b02333..b756358 100644
--- a/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala
@@ -1,11 +1,9 @@
 package org.apache.spot
 
 
-import javax.swing.text.Utilities
 
-import org.apache.spot.dns.{DNSSuspiciousConnectsAnalysis, DNSWordCreation}
 import org.apache.spot.testutils.TestingSparkContextFlatSpec
-import org.apache.spot.utilities.{CountryCodes, Entropy, TopDomains}
+import org.apache.spot.utilities.Entropy
 import org.scalatest.Matchers
 
 class DNSWordCreationTest extends TestingSparkContextFlatSpec with Matchers {

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala b/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala
index f3cf715..832bbd1 100644
--- a/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala
@@ -39,7 +39,7 @@ class FlowWordCreatorTest extends FlatSpec with Matchers {
 
 
     dstWord shouldBe "-1_23_5_2_0"
-    srcWord shouldBe "23_5_2_0"
+    srcWord shouldBe  "23_5_2_0"
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
new file mode 100644
index 0000000..138f32e
--- /dev/null
+++ b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
@@ -0,0 +1,59 @@
+package org.apache.spot.dns
+
+import org.apache.log4j.{Level, LogManager}
+import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
+import org.apache.spot.dns.DNSSchema._
+import org.apache.spot.testutils.TestingSparkContextFlatSpec
+import org.scalatest.Matchers
+
+case class DNSInput(frame_time:String, unix_tstamp:Long, frame_len:Int, ip_dst: String, dns_qry_name:String, dns_qry_class:String, dns_qry_type: Int, dns_qry_rcode: Int)
+
+class DNSSuspiciousConnectsAnalysisTest  extends TestingSparkContextFlatSpec with Matchers {
+
+  val testConfig = SuspiciousConnectsConfig(analysis = "dns",
+  inputPath = "",
+  feedbackFile = "",
+  duplicationFactor = 1,
+  topicCount = 20,
+  hdfsScoredConnect = "",
+  threshold = 1.0d,
+  maxResults = 1000,
+  outputDelimiter = "\t",
+  ldaPRGSeed = None,
+  ldaMaxiterations = 20,
+  ldaAlpha = 1.02,
+  ldaBeta = 1.001)
+
+
+  "dns supicious connects analysis" should "estimate correct probabilities in toy data with framelength anomaly" in {
+
+    val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+    logger.setLevel(Level.INFO)
+    val testSqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
+
+    val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT",	1463735425L,	1,	"172.16.9.132",	"turner.com.122.2o7.net",	"0x00000001",	1,	0)
+    val typicalRecord   = DNSInput("May 20 2016 02:10:25.970987000 PDT",	1463735425L,	168,	"172.16.9.132",	"turner.com.122.2o7.net",	"0x00000001",	1,	0)
+
+    import testSqlContext.implicits._
+
+    val data = sparkContext.parallelize(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord)).toDF
+
+    val scoredData = DNSSuspiciousConnectsAnalysis.detectDNSAnomalies(data, testConfig,
+      sparkContext,
+      sqlContext,
+      logger)
+
+
+    val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
+    val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
+
+    Math.abs(anomalyScore - 0.2d)  should be <= 0.01d
+    typicalScores.length shouldBe 4
+    Math.abs(typicalScores(0) - 0.8d)  should be <= 0.01d
+    Math.abs(typicalScores(1) - 0.8d)  should be <= 0.01d
+    Math.abs(typicalScores(2) - 0.8d)  should be <= 0.01d
+    Math.abs(typicalScores(3) - 0.8d)  should be <= 0.01d
+  }
+
+
+}