You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by ev...@apache.org on 2017/01/25 18:36:45 UTC
[33/49] incubator-spot git commit: unit_test_cleanup
unit_test_cleanup
CODE BASE IS NOT FUNCTIONAL AT THIS COMMIT
restructuring code for easier merge of data validation code with unit test changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/c638e2fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/c638e2fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/c638e2fa
Branch: refs/heads/master
Commit: c638e2fac593dcd91d900f5b350959a8bc0d49b2
Parents: 986cebf
Author: nlsegerl <na...@intel.com>
Authored: Wed Jan 4 11:31:19 2017 -0800
Committer: nlsegerl <na...@intel.com>
Committed: Wed Jan 4 11:31:19 2017 -0800
----------------------------------------------------------------------
.../org/apache/spot/SuspiciousConnects.scala | 20 ++-
.../dns/DNSSuspiciousConnectsAnalysis.scala | 169 +++++++++++++++----
.../FlowSuspiciousConnectsAnalysis.scala | 116 ++++++++++---
.../proxy/ProxySuspiciousConnectsAnalysis.scala | 150 ++++++++++++++--
4 files changed, 387 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/c638e2fa/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
index 8751189..dfd5b8f 100644
--- a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
@@ -7,7 +7,7 @@ import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.dns.DNSSuspiciousConnectsAnalysis
import org.apache.spot.netflow.FlowSuspiciousConnectsAnalysis
import org.apache.spot.proxy.ProxySuspiciousConnectsAnalysis
-
+import org.apache.spot.utilities.data.InputOutputDataHandler
/**
* Top level entrypoint to execute suspicious connections analysis on network data.
@@ -43,13 +43,25 @@ object SuspiciousConnects {
val sqlContext = new SQLContext(sparkContext)
implicit val outputDelimiter = config.outputDelimiter
+ val inputDataFrame = InputOutputDataHandler.getInputDataFrame(sqlContext, config.inputPath, logger)
+ .getOrElse(sqlContext.emptyDataFrame)
+ if(inputDataFrame.rdd.isEmpty()) {
+ logger.error("Couldn't read data from location " + config.inputPath +", please verify it's a valid location and that " +
+ s"contains parquet files with a given schema and try again.")
+ System.exit(0)
+ }
+
+
+
analysis match {
- case "flow" => FlowSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger)
- case "dns" => DNSSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger)
- case "proxy" => ProxySuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger)
+ case "flow" => FlowSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger, inputDataFrame)
+ case "dns" => DNSSuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger, inputDataFrame)
+ case "proxy" => ProxySuspiciousConnectsAnalysis.run(config, sparkContext, sqlContext, logger, inputDataFrame)
case _ => logger.error("Unsupported (or misspelled) analysis: " + analysis)
}
+ InputOutputDataHandler.mergeResultsFiles(sparkContext, config.hdfsScoredConnect, analysis, logger)
+
sparkContext.stop()
case None => logger.error("Error parsing arguments")
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/c638e2fa/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
index 244b941..5db5c50 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
@@ -8,7 +8,11 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.dns.DNSSchema._
import org.apache.spot.dns.model.DNSSuspiciousConnectsModel
+import org.apache.log4j.Logger
import org.apache.spot.dns.model.DNSSuspiciousConnectsModel.ModelSchema
+import org.apache.spot.proxy.ProxySchema.Score
+import org.apache.spot.utilities.data.validation.{InvalidDataHandler => dataValidation}
+
/**
* The suspicious connections analysis of DNS log data develops a probabilistic model the DNS queries
@@ -17,27 +21,6 @@ import org.apache.spot.dns.model.DNSSuspiciousConnectsModel.ModelSchema
object DNSSuspiciousConnectsAnalysis {
- val inSchema = StructType(List(TimestampField, UnixTimestampField, FrameLengthField, ClientIPField,
- QueryNameField, QueryClassField, QueryTypeField, QueryResponseCodeField))
-
- val inColumns = inSchema.fieldNames.map(col)
-
-
- assert(ModelSchema.fields.forall(inSchema.fields.contains(_)))
-
- val OutSchema = StructType(
- List(TimestampField,
- UnixTimestampField,
- FrameLengthField,
- ClientIPField,
- QueryNameField,
- QueryClassField,
- QueryTypeField,
- QueryResponseCodeField,
- ScoreField))
-
- val OutColumns = OutSchema.fieldNames.map(col)
-
/**
* Run suspicious connections analysis on DNS log data.
@@ -48,31 +31,37 @@ object DNSSuspiciousConnectsAnalysis {
* @param sqlContext
* @param logger
*/
- def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger) = {
- logger.info("Starting DNS suspicious connects analysis.")
+ def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+ inputDNSRecords: DataFrame) = {
- logger.info("Loading data")
+ logger.info("Starting DNS suspicious connects analysis.")
+ val userDomain = config.userDomain
- val rawDataDF = sqlContext.read.parquet(config.inputPath)
- .filter(Timestamp + " is not null and " + UnixTimestamp + " is not null")
- .select(inColumns:_*)
+ val cleanDNSRecords = filterAndSelectCleanDNSRecords(inputDNSRecords)
+ logger.info("Training the model")
+ val scoredDNSRecords = detectDNSAnomalies(cleanDNSRecords, config, sparkContext, sqlContext, logger)
- val scoredDF = detectDNSAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
+ val filteredDNSRecords = filterScoredDNSRecords(scoredDNSRecords, config.threshold)
+ val orderedDNSRecords = filteredDNSRecords.orderBy(Score)
+ val mostSuspiciousDNSRecords = if(config.maxResults > 0) orderedDNSRecords.limit(config.maxResults) else orderedDNSRecords
- val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
- val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
+ val outputDNSRecords = mostSuspiciousDNSRecords.select(OutSchema:_*).sort(Score)
- mostSusipiciousDF.select(OutColumns:_*).sort(Score)
+ logger.info("DNS suspicious connects analysis completed.")
logger.info("Saving results to : " + config.hdfsScoredConnect)
+ outputDNSRecords.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+ val invalidDNSRecords = filterAndSelectInvalidDNSRecords(inputDNSRecords)
+ dataValidation.showAndSaveInvalidRecords(invalidDNSRecords, config.hdfsScoredConnect, logger)
- mostSusipiciousDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+ val corruptDNSRecords = filterAndSelectCorruptDNSRecords(scoredDNSRecords)
+ dataValidation.showAndSaveCorruptRecords(corruptDNSRecords, config.hdfsScoredConnect, logger)
}
/**
@@ -98,4 +87,120 @@ object DNSSuspiciousConnectsAnalysis {
logger.info("Identifying outliers")
model.score(sparkContext, sqlContext, data, userDomain)
}
+
+ /**
+ *
+ * @param inputDNSRecords raw DNS records.
+ * @return
+ */
+ def filterAndSelectCleanDNSRecords(inputDNSRecords: DataFrame): DataFrame ={
+
+ val cleanDNSRecordsFilter = inputDNSRecords(Timestamp).isNotNull &&
+ inputDNSRecords(Timestamp).notEqual("") &&
+ inputDNSRecords(Timestamp).notEqual("-") &&
+ inputDNSRecords(UnixTimestamp).isNotNull &&
+ inputDNSRecords(FrameLength).isNotNull &&
+ inputDNSRecords(QueryName).isNotNull &&
+ inputDNSRecords(QueryName).notEqual("") &&
+ inputDNSRecords(QueryName).notEqual("-") &&
+ inputDNSRecords(QueryName).notEqual("(empty)") &&
+ inputDNSRecords(ClientIP).isNotNull &&
+ inputDNSRecords(ClientIP).notEqual("") &&
+ inputDNSRecords(ClientIP).notEqual("-") &&
+ ((inputDNSRecords(QueryClass).isNotNull &&
+ inputDNSRecords(QueryClass).notEqual("") &&
+ inputDNSRecords(QueryClass).notEqual("-")) ||
+ inputDNSRecords(QueryType).isNotNull ||
+ inputDNSRecords(QueryResponseCode).isNotNull)
+
+ inputDNSRecords
+ .filter(cleanDNSRecordsFilter)
+ .select(InSchema: _*)
+ .na.fill(DefaultQueryClass, Seq(QueryClass))
+ .na.fill(DefaultQueryType, Seq(QueryType))
+ .na.fill(DefaultQueryResponseCode, Seq(QueryResponseCode))
+ }
+
+ /**
+ *
+ * @param inputDNSRecords raw DNS records.
+ * @return
+ */
+ def filterAndSelectInvalidDNSRecords(inputDNSRecords: DataFrame): DataFrame ={
+
+ val invalidDNSRecordsFilter = inputDNSRecords(Timestamp).isNull ||
+ inputDNSRecords(Timestamp).equalTo("") ||
+ inputDNSRecords(Timestamp).equalTo("-") ||
+ inputDNSRecords(UnixTimestamp).isNull ||
+ inputDNSRecords(FrameLength).isNull ||
+ inputDNSRecords(QueryName).isNull ||
+ inputDNSRecords(QueryName).equalTo("") ||
+ inputDNSRecords(QueryName).equalTo("-") ||
+ inputDNSRecords(QueryName).equalTo("(empty)") ||
+ inputDNSRecords(ClientIP).isNull ||
+ inputDNSRecords(ClientIP).equalTo("") ||
+ inputDNSRecords(ClientIP).equalTo("-") ||
+ ((inputDNSRecords(QueryClass).isNull ||
+ inputDNSRecords(QueryClass).equalTo("") ||
+ inputDNSRecords(QueryClass).equalTo("-")) &&
+ inputDNSRecords(QueryType).isNull &&
+ inputDNSRecords(QueryResponseCode).isNull)
+
+ inputDNSRecords
+ .filter(invalidDNSRecordsFilter)
+ .select(InSchema: _*)
+ }
+
+ /**
+ *
+ * @param scoredDNSRecords scored DNS records.
+ * @param threshold score tolerance.
+ * @return
+ */
+ def filterScoredDNSRecords(scoredDNSRecords: DataFrame, threshold: Double): DataFrame ={
+
+ val filteredDNSRecordsFilter = scoredDNSRecords(Score).leq(threshold) &&
+ scoredDNSRecords(Score).gt(dataValidation.ScoreError)
+
+ scoredDNSRecords.filter(filteredDNSRecordsFilter)
+ }
+
+ /**
+ *
+ * @param scoredDNSRecords scored DNS records.
+ * @return
+ */
+ def filterAndSelectCorruptDNSRecords(scoredDNSRecords: DataFrame): DataFrame = {
+
+ val corruptDNSRecordsFilter = scoredDNSRecords(Score).equalTo(dataValidation.ScoreError)
+
+ scoredDNSRecords
+ .filter(corruptDNSRecordsFilter)
+ .select(OutSchema: _*)
+
+ }
+
+
+ val DefaultQueryClass = "unknown"
+ val DefaultQueryType = -1
+ val DefaultQueryResponseCode = -1
+
+ val InStructType = StructType(List(TimestampField, UnixTimestampField, FrameLengthField, ClientIPField,
+ QueryNameField, QueryClassField, QueryTypeField, QueryResponseCodeField))
+
+ val InSchema = InStructType.fieldNames.map(col)
+
+ assert(ModelSchema.fields.forall(InStructType.fields.contains(_)))
+
+ val OutSchema = StructType(
+ List(TimestampField,
+ UnixTimestampField,
+ FrameLengthField,
+ ClientIPField,
+ QueryNameField,
+ QueryClassField,
+ QueryTypeField,
+ QueryResponseCodeField,
+ ScoreField)).fieldNames.map(col)
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/c638e2fa/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
index 098a787..127b2a7 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
@@ -8,6 +8,7 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.netflow.FlowSchema._
import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel
+import org.apache.spot.utilities.data.validation.{InvalidDataHandler => dataValidation}
/**
@@ -17,29 +18,33 @@ import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel
object FlowSuspiciousConnectsAnalysis {
- def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger) = {
+ def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+ inputFlowRecords: DataFrame) = {
- logger.info("Loading data")
+ logger.info("Starting flow suspicious connects analysis.")
- val rawDataDF = sqlContext.read.parquet(config.inputPath)
- .filter(Hour + " BETWEEN 0 AND 23 AND " + Minute + " BETWEEN 0 AND 59 AND " + Second + " BETWEEN 0 AND 59")
- .select(inColumns: _*)
+ val cleanFlowRecords = filterAndSelectCleanFlowRecords(inputFlowRecords)
+ val scoredFlowRecords = detectFlowAnomalies(cleanFlowRecords, config, sparkContext, sqlContext, logger)
- logger.info("Training the model")
+ val filteredFlowRecords = filterScoredFlowRecords(scoredFlowRecords, config.threshold)
- val scoredDF = detectFlowAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
+ val orderedFlowRecords = filteredFlowRecords.orderBy(Score)
- val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
+ val mostSuspiciousFlowRecords =
+ if(config.maxResults > 0 ) orderedFlowRecords.limit(config.maxResults) else orderedFlowRecords
- val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
-
-
- val outputDF = mostSusipiciousDF.select(OutColumns: _*)
+ val outputFlowRecords = mostSuspiciousFlowRecords.select(OutSchema: _*)
logger.info("Netflow suspicious connects analysis completed.")
logger.info("Saving results to : " + config.hdfsScoredConnect)
- outputDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+ outputFlowRecords.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+
+ val invalidFlowRecords = filterAndSelectInvalidFlowRecords(inputFlowRecords)
+ dataValidation.showAndSaveInvalidRecords(invalidFlowRecords, config.hdfsScoredConnect, logger)
+
+ val corruptFlowRecords = filterAndSelectCorruptFlowRecords(scoredFlowRecords)
+ dataValidation.showAndSaveCorruptRecords(corruptFlowRecords, config.hdfsScoredConnect, logger)
}
/**
@@ -67,7 +72,83 @@ object FlowSuspiciousConnectsAnalysis {
model.score(sparkContext, sqlContext, data)
}
- val inSchema = StructType(List(TimeReceivedField,
+ /**
+ *
+ * @param inputFlowRecords raw flow records
+ * @return
+ */
+ def filterAndSelectCleanFlowRecords(inputFlowRecords: DataFrame): DataFrame ={
+
+ val cleanFlowRecordsFilter = inputFlowRecords(Hour).between(0, 23) &&
+ inputFlowRecords(Minute).between(0, 59) &&
+ inputFlowRecords(Second).between(0, 59) &&
+ inputFlowRecords(TimeReceived).isNotNull &&
+ inputFlowRecords(SourceIP).isNotNull &&
+ inputFlowRecords(DestinationIP).isNotNull &&
+ inputFlowRecords(SourcePort).isNotNull &&
+ inputFlowRecords(DestinationPort).isNotNull &&
+ inputFlowRecords(Ibyt).isNotNull &&
+ inputFlowRecords(Ipkt).isNotNull
+
+ inputFlowRecords
+ .filter(cleanFlowRecordsFilter)
+ .select(InSchema: _*)
+
+ }
+
+ /**
+ *
+ * @param inputFlowRecords raw flow records.
+ * @return
+ */
+ def filterAndSelectInvalidFlowRecords(inputFlowRecords: DataFrame): DataFrame = {
+
+ val invalidFlowRecordsFilter = inputFlowRecords(Hour).between(0,23) &&
+ inputFlowRecords(Minute).between(0,59) &&
+ inputFlowRecords(Second).between(0,59) &&
+ inputFlowRecords(TimeReceived).isNull ||
+ inputFlowRecords(SourceIP).isNull ||
+ inputFlowRecords(DestinationIP).isNull ||
+ inputFlowRecords(SourcePort).isNull ||
+ inputFlowRecords(DestinationPort).isNull ||
+ inputFlowRecords(Ibyt).isNull ||
+ inputFlowRecords(Ipkt).isNull
+
+ inputFlowRecords
+ .filter(invalidFlowRecordsFilter)
+ .select(InSchema: _*)
+ }
+
+ /**
+ *
+ * @param scoredFlowRecords scored flow records.
+ * @param threshold score tolerance.
+ * @return
+ */
+ def filterScoredFlowRecords(scoredFlowRecords: DataFrame, threshold: Double): DataFrame = {
+
+ val filteredFlowRecordsFilter = scoredFlowRecords(Score).leq(threshold) &&
+ scoredFlowRecords(Score).gt(dataValidation.ScoreError)
+
+ scoredFlowRecords.filter(filteredFlowRecordsFilter)
+ }
+
+ /**
+ *
+ * @param scoredFlowRecords scored flow records.
+ * @return
+ */
+ def filterAndSelectCorruptFlowRecords(scoredFlowRecords: DataFrame): DataFrame = {
+
+ val corruptFlowRecordsFilter = scoredFlowRecords(Score).equalTo(dataValidation.ScoreError)
+
+ scoredFlowRecords
+ .filter(corruptFlowRecordsFilter)
+ .select(OutSchema: _*)
+
+ }
+
+ val InSchema = StructType(List(TimeReceivedField,
YearField,
MonthField,
DayField,
@@ -83,9 +164,7 @@ object FlowSuspiciousConnectsAnalysis {
IpktField,
IbytField,
OpktField,
- ObytField))
-
- val inColumns = inSchema.fieldNames.map(col)
+ ObytField)).fieldNames.map(col)
val OutSchema = StructType(
List(TimeReceivedField,
@@ -105,7 +184,6 @@ object FlowSuspiciousConnectsAnalysis {
IbytField,
OpktField,
ObytField,
- ScoreField))
+ ScoreField)).fieldNames.map(col)
- val OutColumns = OutSchema.fieldNames.map(col)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/c638e2fa/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
index 38150ca..290f101 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
@@ -2,10 +2,12 @@ package org.apache.spot.proxy
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.proxy.ProxySchema._
-
+import org.apache.spot.utilities.data.validation.{InvalidDataHandler => dataValidation}
/**
* Run suspicious connections analysis on proxy data.
*/
@@ -19,27 +21,35 @@ object ProxySuspiciousConnectsAnalysis {
* @param sqlContext Spark SQL context.
* @param logger Logs execution progress, information and errors for user.
*/
- def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger) = {
+ def run(config: SuspiciousConnectsConfig, sparkContext: SparkContext, sqlContext: SQLContext, logger: Logger,
+ inputProxyRecords: DataFrame) = {
logger.info("Starting proxy suspicious connects analysis.")
- logger.info("Loading data from: " + config.inputPath)
+ val cleanProxyRecords = filterAndSelectCleanProxyRecords(inputProxyRecords)
+
+
+ val scoredProxyRecords = detectProxyAnomalies(cleanProxyRecords, config, sparkContext, sqlContext, logger)
+
+ // take the maxResults least probable events of probability below the threshold and sort
+
+ val filteredProxyRecords = filterScoredProxyRecords(scoredProxyRecords, config.threshold)
- val rawDataDF = sqlContext.read.parquet(config.inputPath).
- filter(Date + " is not null and " + Time + " is not null and " + ClientIP + " is not null").
- select(Date, Time, ClientIP, Host, ReqMethod, UserAgent, ResponseContentType, Duration, UserName,
- WebCat, Referer, RespCode, URIPort, URIPath, URIQuery, ServerIP, SCBytes, CSBytes, FullURI)
+ val orderedProxyRecords = filteredProxyRecords.orderBy(Score)
- val scoredDF = detectProxyAnomalies(rawDataDF, config, sparkContext, sqlContext, logger)
+ val mostSuspiciousProxyRecords = if(config.maxResults > 0) orderedProxyRecords.limit(config.maxResults) else orderedProxyRecords
+ val outputProxyRecords = mostSuspiciousProxyRecords.select(OutSchema:_*)
- val filteredDF = scoredDF.filter(Score + " <= " + config.threshold)
- val mostSusipiciousDF: DataFrame = filteredDF.orderBy(Score).limit(config.maxResults)
+ logger.info("Proxy suspicious connects analysis completed")
+ logger.info("Saving results to: " + config.hdfsScoredConnect)
+ outputProxyRecords.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
- logger.info("Persisting data to hdfs: " + config.hdfsScoredConnect)
- mostSusipiciousDF.map(_.mkString(config.outputDelimiter)).saveAsTextFile(config.hdfsScoredConnect)
+ val invalidProxyRecords = filterAndSelectInvalidProxyRecords(inputProxyRecords)
+ dataValidation.showAndSaveInvalidRecords(invalidProxyRecords, config.hdfsScoredConnect, logger)
- logger.info("Proxy suspcicious connects completed")
+ val corruptProxyRecords = filterAndSelectCorruptProxyRecords(scoredProxyRecords)
+ dataValidation.showAndSaveCorruptRecords(corruptProxyRecords, config.hdfsScoredConnect, logger)
}
@@ -66,4 +76,118 @@ object ProxySuspiciousConnectsAnalysis {
model.score(sparkContext, data)
}
+
+ /**
+ *
+ * @param inputProxyRecords raw proxy records.
+ * @return
+ */
+ def filterAndSelectCleanProxyRecords(inputProxyRecords: DataFrame): DataFrame ={
+
+ val cleanProxyRecordsFilter = inputProxyRecords(Date).isNotNull &&
+ inputProxyRecords(Time).isNotNull &&
+ inputProxyRecords(ClientIP).isNotNull &&
+ inputProxyRecords(Host).isNotNull &&
+ inputProxyRecords(FullURI).isNotNull
+
+ inputProxyRecords
+ .filter(cleanProxyRecordsFilter)
+ .select(InSchema:_*)
+ .na.fill(DefaultUserAgent, Seq(UserAgent))
+ .na.fill(DefaultResponseContentType, Seq(ResponseContentType))
+ }
+
+ /**
+ *
+ * @param inputProxyRecords raw proxy records.
+ * @return
+ */
+ def filterAndSelectInvalidProxyRecords(inputProxyRecords: DataFrame): DataFrame ={
+
+ val invalidProxyRecordsFilter = inputProxyRecords(Date).isNull ||
+ inputProxyRecords(Time).isNull ||
+ inputProxyRecords(ClientIP).isNull ||
+ inputProxyRecords(Host).isNull ||
+ inputProxyRecords(FullURI).isNull
+
+ inputProxyRecords
+ .filter(invalidProxyRecordsFilter)
+ .select(InSchema: _*)
+ }
+
+ /**
+ *
+ * @param scoredProxyRecords scored proxy records.
+ * @param threshold score tolerance.
+ * @return
+ */
+ def filterScoredProxyRecords(scoredProxyRecords: DataFrame, threshold: Double): DataFrame ={
+
+ val filteredProxyRecordsFilter = scoredProxyRecords(Score).leq(threshold) &&
+ scoredProxyRecords(Score).gt(dataValidation.ScoreError)
+
+ scoredProxyRecords.filter(filteredProxyRecordsFilter)
+ }
+
+ /**
+ *
+ * @param scoredProxyRecords scored proxy records.
+ * @return
+ */
+ def filterAndSelectCorruptProxyRecords(scoredProxyRecords: DataFrame): DataFrame ={
+
+ val corruptProxyRecordsFilter = scoredProxyRecords(Score).equalTo(dataValidation.ScoreError)
+
+ scoredProxyRecords
+ .filter(corruptProxyRecordsFilter)
+ .select(OutSchema: _*)
+ }
+
+ val DefaultUserAgent = "-"
+ val DefaultResponseContentType = "-"
+
+ val InSchema = StructType(
+ List(DateField,
+ TimeField,
+ ClientIPField,
+ HostField,
+ ReqMethodField,
+ UserAgentField,
+ ResponseContentTypeField,
+ DurationField,
+ UserNameField,
+ WebCatField,
+ RefererField,
+ RespCodeField,
+ URIPortField,
+ URIPathField,
+ URIQueryField,
+ ServerIPField,
+ SCBytesField,
+ CSBytesField,
+ FullURIField)).fieldNames.map(col)
+
+ val OutSchema = StructType(
+ List(DateField,
+ TimeField,
+ ClientIPField,
+ HostField,
+ ReqMethodField,
+ UserAgentField,
+ ResponseContentTypeField,
+ DurationField,
+ UserNameField,
+ WebCatField,
+ RefererField,
+ RespCodeField,
+ URIPortField,
+ URIPathField,
+ URIQueryField,
+ ServerIPField,
+ SCBytesField,
+ CSBytesField,
+ FullURIField,
+ WordField,
+ ScoreField)).fieldNames.map(col)
+
}
\ No newline at end of file