You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by ev...@apache.org on 2017/01/25 18:36:23 UTC
[11/49] incubator-spot git commit: test_dns_topdomain
test_dns_topdomain
some unit testing necessitated refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/0f1a6c5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/0f1a6c5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/0f1a6c5e
Branch: refs/heads/master
Commit: 0f1a6c5e3c35247afce9c4df7da548f274d2b015
Parents: 83d157a
Author: nlsegerl <na...@intel.com>
Authored: Mon Dec 12 14:19:56 2016 -0800
Committer: nlsegerl <na...@intel.com>
Committed: Mon Dec 12 14:19:56 2016 -0800
----------------------------------------------------------------------
.../dns/DNSSuspiciousConnectsAnalysis.scala | 42 ++++++++++----
.../FlowSuspiciousConnectsAnalysis.scala | 33 +++++++++--
.../proxy/ProxySuspiciousConnectsAnalysis.scala | 44 ++++++++++-----
.../apache/spot/utilities/DataFrameUtils.scala | 39 -------------
.../org/apache/spot/DNSWordCreationTest.scala | 4 +-
.../org/apache/spot/FlowWordCreatorTest.scala | 2 +-
.../dns/DNSSuspiciousConnectsAnalysisTest.scala | 59 ++++++++++++++++++++
7 files changed, 149 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
index d0e6da1..f2ce7a4 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
@@ -1,5 +1,6 @@
package org.apache.spot.dns
+import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
@@ -7,8 +8,6 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.dns.DNSSchema._
import org.apache.spot.dns.model.DNSSuspiciousConnectsModel
-import org.apache.log4j.Logger
-
import org.apache.spot.dns.model.DNSSuspiciousConnectsModel.ModelSchema
/**
@@ -42,6 +41,7 @@ object DNSSuspiciousConnectsAnalysis {
/**
* Run suspicious connections analysis on DNS log data.
+ * Saves the most suspicious connections to a CSV file on HDFS.
*
* @param config Object encapsulating runtime parameters and CLI options.
* @param sparkContext
@@ -58,22 +58,42 @@ object DNSSuspiciousConnectsAnalysis {
.filter(Timestamp + " is not null and " + UnixTimestamp + " is not null")
.select(inColumns:_*)
- logger.info("Training the model")
- val model =
- DNSSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, rawDataDF, config.topicCount)
- logger.info("Scoring")
- val scoredDF = model.score(sparkContext, sqlContext, rawDataDF)
+ val scoredDF = detectDNSAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
- val outputDF = mostSusipiciousDF.select(OutColumns:_*).sort(Score)
-
- logger.info("DNS suspcicious connects analysis completed.")
+ mostSusipiciousDF.select(OutColumns:_*).sort(Score)
logger.info("Saving results to : " + config.hdfsScoredConnect)
- outputDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+
+
+ mostSusipiciousDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+ }
+
+ /**
+ * Identify anomalous DNS log entries in in the provided data frame.
+ *
+ * @param data Data frame of DNS entries
+ * @param config
+ * @param sparkContext
+ * @param sqlContext
+ * @param logger
+ * @return
+ */
+ def detectDNSAnomalies(data: DataFrame, config: SuspiciousConnectsConfig,
+ sparkContext: SparkContext,
+ sqlContext: SQLContext,
+ logger: Logger) : DataFrame = {
+
+
+ logger.info("Fitting probabilistic model to data")
+ val model =
+ DNSSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, data, config.topicCount)
+
+ logger.info("Identifying outliers")
+ model.score(sparkContext, sqlContext, data)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
index 32c0f6e..098a787 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
@@ -17,7 +17,7 @@ import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel
object FlowSuspiciousConnectsAnalysis {
- def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger)(implicit outputDelimiter: String) = {
+ def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger) = {
logger.info("Loading data")
@@ -28,11 +28,7 @@ object FlowSuspiciousConnectsAnalysis {
logger.info("Training the model")
- val model =
- FlowSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, rawDataDF, config.topicCount)
-
- logger.info("Scoring")
- val scoredDF = model.score(sparkContext, sqlContext, rawDataDF)
+ val scoredDF = detectFlowAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
@@ -46,6 +42,31 @@ object FlowSuspiciousConnectsAnalysis {
outputDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
}
+ /**
+ * Identify anomalous netflow log entries in in the provided data frame.
+ *
+ * @param data Data frame of netflow entries
+ * @param config
+ * @param sparkContext
+ * @param sqlContext
+ * @param logger
+ * @return
+ */
+ def detectFlowAnomalies(data: DataFrame,
+ config: SuspiciousConnectsConfig,
+ sparkContext: SparkContext,
+ sqlContext: SQLContext,
+ logger: Logger) : DataFrame = {
+
+
+ logger.info("Fitting probabilistic model to data")
+ val model =
+ FlowSuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, data, config.topicCount)
+
+ logger.info("Identifying outliers")
+ model.score(sparkContext, sqlContext, data)
+ }
+
val inSchema = StructType(List(TimeReceivedField,
YearField,
MonthField,
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
index 1131406..38150ca 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
@@ -2,10 +2,9 @@ package org.apache.spot.proxy
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.proxy.ProxySchema._
-import org.apache.spot.utilities.DataFrameUtils
/**
* Run suspicious connections analysis on proxy data.
@@ -31,23 +30,40 @@ object ProxySuspiciousConnectsAnalysis {
select(Date, Time, ClientIP, Host, ReqMethod, UserAgent, ResponseContentType, Duration, UserName,
WebCat, Referer, RespCode, URIPort, URIPath, URIQuery, ServerIP, SCBytes, CSBytes, FullURI)
- logger.info("Training the model")
- val model =
- ProxySuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, rawDataDF)
+ val scoredDF = detectProxyAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
- logger.info("Scoring")
- val scoredDF = model.score(sparkContext, rawDataDF)
-
- // take the maxResults least probable events of probability below the threshold and sort
val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
- val topRows = DataFrameUtils.dfTakeOrdered(filteredDF, "score", config.maxResults)
- val scoreIndex = scoredDF.schema.fieldNames.indexOf("score")
- val outputRDD = sparkContext.parallelize(topRows).sortBy(row => row.getDouble(scoreIndex))
+ val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
- logger.info("Persisting data")
- outputRDD.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+ logger.info("Persisting data to hdfs: " + config.hdfsScoredConnect)
+ mostSusipiciousDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
logger.info("Proxy suspcicious connects completed")
}
+
+
+ /**
+ * Identify anomalous proxy log entries in in the provided data frame.
+ *
+ * @param data Data frame of proxy entries
+ * @param config
+ * @param sparkContext
+ * @param sqlContext
+ * @param logger
+ * @return
+ */
+ def detectProxyAnomalies(data: DataFrame,
+ config: SuspiciousConnectsConfig,
+ sparkContext: SparkContext,
+ sqlContext: SQLContext,
+ logger: Logger) : DataFrame = {
+
+
+ logger.info("Fitting probabilistic model to data")
+ val model = ProxySuspiciousConnectsModel.trainNewModel(sparkContext, sqlContext, logger, config, data)
+ logger.info("Identifying outliers")
+
+ model.score(sparkContext, data)
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/main/scala/org/apache/spot/utilities/DataFrameUtils.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/DataFrameUtils.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/DataFrameUtils.scala
deleted file mode 100644
index d5af6ee..0000000
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/DataFrameUtils.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.spot.utilities
-
-import org.apache.spark.sql.{DataFrame, Row}
-
-/**
- * Some handy operations on dataframes not provided by Apache Spark.
- */
-object DataFrameUtils {
-
- /**
- * Returns the rows of a dataframe whose values in a provided column are in the first k
- * (least to greatest) values. If strictly fewer than k rows are in the dataframe, all rows are returned.
- *
- * Dataframe analog to takeOrdered.
- *
- * @param df Input dataframe.
- * @param colName Column to consider.
- * @param k Maximum number of rows to return.
- * @return Array of (at most k) rows.
- */
- def dfTakeOrdered(df: DataFrame, colName: String, k: Int) : Array[Row] = {
- val count = df.count
-
- val takeCount = if (k == -1 || count < k) {
- count.toInt
- } else {
- k
- }
-
- val colIndex = df.schema.fieldNames.indexOf(colName)
-
- class DataOrdering() extends Ordering[Row] {
- def compare(row1: Row, row2: Row) = row1.getDouble(colIndex).compare(row2.getDouble(colIndex))
- }
-
- implicit val rowOrdering = new DataOrdering()
- df.rdd.takeOrdered(takeCount)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala b/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala
index 1b02333..b756358 100644
--- a/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/DNSWordCreationTest.scala
@@ -1,11 +1,9 @@
package org.apache.spot
-import javax.swing.text.Utilities
-import org.apache.spot.dns.{DNSSuspiciousConnectsAnalysis, DNSWordCreation}
import org.apache.spot.testutils.TestingSparkContextFlatSpec
-import org.apache.spot.utilities.{CountryCodes, Entropy, TopDomains}
+import org.apache.spot.utilities.Entropy
import org.scalatest.Matchers
class DNSWordCreationTest extends TestingSparkContextFlatSpec with Matchers {
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala b/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala
index f3cf715..832bbd1 100644
--- a/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/FlowWordCreatorTest.scala
@@ -39,7 +39,7 @@ class FlowWordCreatorTest extends FlatSpec with Matchers {
dstWord shouldBe "-1_23_5_2_0"
- srcWord shouldBe "23_5_2_0"
+ srcWord shouldBe "23_5_2_0"
}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0f1a6c5e/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
new file mode 100644
index 0000000..138f32e
--- /dev/null
+++ b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
@@ -0,0 +1,59 @@
+package org.apache.spot.dns
+
+import org.apache.log4j.{Level, LogManager}
+import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
+import org.apache.spot.dns.DNSSchema._
+import org.apache.spot.testutils.TestingSparkContextFlatSpec
+import org.scalatest.Matchers
+
+case class DNSInput(frame_time:String, unix_tstamp:Long, frame_len:Int, ip_dst: String, dns_qry_name:String, dns_qry_class:String, dns_qry_type: Int, dns_qry_rcode: Int)
+
+class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with Matchers {
+
+ val testConfig = SuspiciousConnectsConfig(analysis = "dns",
+ inputPath = "",
+ feedbackFile = "",
+ duplicationFactor = 1,
+ topicCount = 20,
+ hdfsScoredConnect = "",
+ threshold = 1.0d,
+ maxResults = 1000,
+ outputDelimiter = "\t",
+ ldaPRGSeed = None,
+ ldaMaxiterations = 20,
+ ldaAlpha = 1.02,
+ ldaBeta = 1.001)
+
+
+ "dns supicious connects analysis" should "estimate correct probabilities in toy data with framelength anomaly" in {
+
+ val logger = LogManager.getLogger("SuspiciousConnectsAnalysis")
+ logger.setLevel(Level.INFO)
+ val testSqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
+
+ val anomalousRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 1, "172.16.9.132", "turner.com.122.2o7.net", "0x00000001", 1, 0)
+ val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "turner.com.122.2o7.net", "0x00000001", 1, 0)
+
+ import testSqlContext.implicits._
+
+ val data = sparkContext.parallelize(Seq(anomalousRecord, typicalRecord, typicalRecord, typicalRecord, typicalRecord)).toDF
+
+ val scoredData = DNSSuspiciousConnectsAnalysis.detectDNSAnomalies(data, testConfig,
+ sparkContext,
+ sqlContext,
+ logger)
+
+
+ val anomalyScore = scoredData.filter(scoredData(FrameLength) === 1).first().getAs[Double](Score)
+ val typicalScores = scoredData.filter(scoredData(FrameLength) === 168).collect().map(_.getAs[Double](Score))
+
+ Math.abs(anomalyScore - 0.2d) should be <= 0.01d
+ typicalScores.length shouldBe 4
+ Math.abs(typicalScores(0) - 0.8d) should be <= 0.01d
+ Math.abs(typicalScores(1) - 0.8d) should be <= 0.01d
+ Math.abs(typicalScores(2) - 0.8d) should be <= 0.01d
+ Math.abs(typicalScores(3) - 0.8d) should be <= 0.01d
+ }
+
+
+}