You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2017/09/26 22:41:12 UTC
[09/50] [abbrv] incubator-spot git commit: SPOT-166 Schema
validation: Made changes after code review from @NathanSegerlind -
ValidateSchema will return case class with flag isValid and Seq[String] for a
list of invalid columns. Changed flow, dns and pro
SPOT-166 Schema validation:
Made changes after code review from @NathanSegerlind
- ValidateSchema will return case class with flag isValid and Seq[String] for a list of invalid columns.
Changed flow, dns and proxy pipelines to handle validateSchema response InputSchemaValidationResponse
- Updated unit tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/3cab5ddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/3cab5ddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/3cab5ddd
Branch: refs/heads/SPOT-181_ODM
Commit: 3cab5ddd3987bf7ce1a24a6d4b67d597183f888e
Parents: 9c537a7
Author: Ricardo Barona <ri...@intel.com>
Authored: Fri Jun 30 13:40:42 2017 -0500
Committer: Ricardo Barona <ri...@intel.com>
Committed: Thu Jul 27 12:51:03 2017 -0500
----------------------------------------------------------------------
.../spot/dns/DNSSuspiciousConnectsAnalysis.scala | 9 +++++----
.../spot/netflow/FlowSuspiciousConnectsAnalysis.scala | 9 +++++----
.../spot/proxy/ProxySuspiciousConnectsAnalysis.scala | 9 +++++----
.../spot/utilities/data/validation/InputSchema.scala | 13 ++++++++-----
.../spot/dns/DNSSuspiciousConnectsAnalysisTest.scala | 8 ++++----
.../netflow/FlowSuspiciousConnectsAnalysisTest.scala | 8 ++++----
.../proxy/ProxySuspiciousConnectsAnalysisTest.scala | 8 ++++----
.../utilities/data/validation/InputSchemaTest.scala | 12 ++++++------
8 files changed, 41 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
index e0c8068..acf8fc6 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
@@ -26,6 +26,7 @@ import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.dns.DNSSchema._
import org.apache.spot.dns.model.DNSSuspiciousConnectsModel
import org.apache.spot.proxy.ProxySchema.Score
+import org.apache.spot.utilities.data.validation.InputSchema.InputSchemaValidationResponse
import org.apache.spot.utilities.data.validation.{InputSchema, InvalidDataHandler => dataValidation}
/**
@@ -68,10 +69,10 @@ object DNSSuspiciousConnectsAnalysis {
logger.info("Starting DNS suspicious connects analysis.")
logger.info("Validating schema...")
- val schemaValidationResults = validateSchema(inputDNSRecords)
+ val InputSchemaValidationResponse(isValid, errorMessages) = validateSchema(inputDNSRecords)
- if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
- schemaValidationResults.foreach(logger.error(_))
+ if (!isValid) {
+ errorMessages.foreach(logger.error(_))
None
} else {
val dnsRecords = filterRecords(inputDNSRecords)
@@ -183,7 +184,7 @@ object DNSSuspiciousConnectsAnalysis {
* @param inputDNSRecords incoming data frame
* @return
*/
- def validateSchema(inputDNSRecords: DataFrame): Seq[String] = {
+ def validateSchema(inputDNSRecords: DataFrame): InputSchemaValidationResponse = {
InputSchema.validate(inputDNSRecords.schema, DNSSuspiciousConnectsModel.ModelSchema)
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
index a355cfb..b2d1304 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
@@ -25,6 +25,7 @@ import org.apache.spot.SuspiciousConnects.SuspiciousConnectsAnalysisResults
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.netflow.FlowSchema._
import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel
+import org.apache.spot.utilities.data.validation.InputSchema.InputSchemaValidationResponse
import org.apache.spot.utilities.data.validation.{InputSchema, InvalidDataHandler => dataValidation}
@@ -88,10 +89,10 @@ object FlowSuspiciousConnectsAnalysis {
logger.info("Starting flow suspicious connects analysis.")
logger.info("Validating schema...")
- val schemaValidationResults = validateSchema(inputFlowRecords)
+ val InputSchemaValidationResponse(isValid, errorMessages) = validateSchema(inputFlowRecords)
- if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
- schemaValidationResults.foreach(logger.error(_))
+ if (!isValid) {
+ errorMessages.foreach(logger.error(_))
None
} else {
@@ -191,7 +192,7 @@ object FlowSuspiciousConnectsAnalysis {
* @param inputFlowRecords incoming data frame
* @return
*/
- def validateSchema(inputFlowRecords: DataFrame): Seq[String] = {
+ def validateSchema(inputFlowRecords: DataFrame): InputSchemaValidationResponse = {
InputSchema.validate(inputFlowRecords.schema, FlowSuspiciousConnectsModel.ModelSchema)
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
index 0fa2fd2..ea0b90a 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spot.SuspiciousConnects.SuspiciousConnectsAnalysisResults
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
import org.apache.spot.proxy.ProxySchema._
+import org.apache.spot.utilities.data.validation.InputSchema.InputSchemaValidationResponse
import org.apache.spot.utilities.data.validation.{InputSchema, InvalidDataHandler => dataValidation}
/**
@@ -89,10 +90,10 @@ object ProxySuspiciousConnectsAnalysis {
logger.info("Starting proxy suspicious connects analysis.")
logger.info("Validating schema...")
- val schemaValidationResults = validateSchema(inputProxyRecords)
+ val InputSchemaValidationResponse(isValid, errorMessages) = validateSchema(inputProxyRecords)
- if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
- schemaValidationResults.foreach(logger.error(_))
+ if (!isValid) {
+ errorMessages.foreach(logger.error(_))
None
} else {
@@ -180,7 +181,7 @@ object ProxySuspiciousConnectsAnalysis {
* @param inputProxyRecords incoming data frame
* @return
*/
- def validateSchema(inputProxyRecords: DataFrame): Seq[String] = {
+ def validateSchema(inputProxyRecords: DataFrame): InputSchemaValidationResponse = {
InputSchema.validate(inputProxyRecords.schema, ProxySuspiciousConnectsModel.ModelSchema)
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala
index 6f0df78..997d992 100644
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala
@@ -10,9 +10,6 @@ import scala.collection.mutable.ListBuffer
*/
object InputSchema {
- // Response from validate will contains always at least 1 element. 1 message means no errors.
- val ResponseDefaultSize = 1
-
/**
* Validate the incoming data schema matches the schema required for model creation and scoring.
*
@@ -20,7 +17,7 @@ object InputSchema {
* @param expectedSchema schema expected by model training and scoring methods
* @return
*/
- def validate(inSchema: StructType, expectedSchema: StructType): Seq[String] = {
+ def validate(inSchema: StructType, expectedSchema: StructType): InputSchemaValidationResponse = {
val response: ListBuffer[String] = ListBuffer("Schema not compatible:")
// reduce schema from struct field to only field name and type
@@ -43,6 +40,12 @@ object InputSchema {
}
})
- response
+ response.length match {
+ case 1 => InputSchemaValidationResponse(isValid = true, Seq())
+ case _ => InputSchemaValidationResponse(isValid = false, response)
+ }
}
+
+ case class InputSchemaValidationResponse(final val isValid: Boolean, final val errorMessages: Seq[String])
+
}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
index a940ea6..5aafbb2 100644
--- a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
@@ -342,23 +342,23 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
scoredDNSRecords.count should be(2)
}
- "validateSchema" should "return a Seq[String] of size > 1 when passing an invalid schema" in {
+ "validateSchema" should "return false when passing an invalid schema" in {
val anomalousRecord = DNSInvalidSchema("May 20 2016 02:10:25.970987000 PDT", 1463735425d, 1, "172.16.9.132",
"122.2o7.turner.com", "0x00000001", "null", "null")
val data = sparkSession.createDataFrame(Seq(anomalousRecord, anomalousRecord, anomalousRecord, anomalousRecord, anomalousRecord))
val results = DNSSuspiciousConnectsAnalysis.validateSchema(data)
- results.length should be > 1
+ results.isValid shouldBe false
}
- it should "return a Seq[String] of size 1 when passing a valid schema" in {
+ it should "return true when passing a valid schema" in {
val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
val data = sparkSession.createDataFrame(Seq(typicalRecord, typicalRecord, typicalRecord, typicalRecord))
val results = DNSSuspiciousConnectsAnalysis.validateSchema(data)
- results.length shouldBe 1
+ results.isValid shouldBe true
}
def testDNSRecords = new {
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
index 7a15295..f7bdb12 100644
--- a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
@@ -292,7 +292,7 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
scoredFlowRecords.count should be(2)
}
- "validateSchema" should "return a Seq[String] of size > 1 when passing an invalid schema" in {
+ "validateSchema" should "return false when passing an invalid schema" in {
val anomalousRecord = FlowInvalidRecord("2016-05-05 00:11:01", 2016, 5, 5, 0, 0, 1, 972, "172.16.0.129", "10.0" +
".2.202", 1024, 80, "TCP", "39l", "12522l", "0l", "0l")
@@ -300,10 +300,10 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
val results = FlowSuspiciousConnectsAnalysis.validateSchema(data)
- results.length should be > 1
+ results.isValid shouldBe false
}
- it should "return a Seq[String] of size 1 when passing a valid schema" in {
+ it should "return true when passing a valid schema" in {
val typicalRecord = FlowRecord("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972f, "172.16.0.129", "10.0.2" +
".202", 1024, 80, "TCP", 39, 12522, 0, 0)
@@ -312,7 +312,7 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
val results = FlowSuspiciousConnectsAnalysis.validateSchema(data)
-
+ results.isValid shouldBe true
}
def testFlowRecords = new {
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
index 8ec75f2..d8e4061 100644
--- a/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
@@ -282,7 +282,7 @@ class ProxySuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wi
}
- "validateSchema" should "return a Seq[String] of size > 1 when passing an invalid schema" in {
+ "validateSchema" should "return false when passing an invalid schema" in {
val anomalousRecord = ProxyInvalidInput("2016-10-03", "04:57:36", "127.0.0.1", 0.0d, "PUT",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36",
1, 230, "-", "Technology/Internet", "http://www.spoonflower.com/tags/color", "202", 80,
@@ -295,11 +295,11 @@ class ProxySuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wi
val results = ProxySuspiciousConnectsAnalysis.validateSchema(data)
- results.length should be > 1
+ results.isValid shouldBe false
}
- it should "return a Seq[String] of size 1 when passing a valid schema" in {
+ it should "return true when passing a valid schema" in {
val typicalRecord = ProxyInput("2016-10-03", "04:57:36", "127.0.0.1", "maw.bronto.com", "PUT",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36",
"text/plain", 230, "-", "Technology/Internet", "http://www.spoonflower.com/tags/color", "202", 80,
@@ -311,7 +311,7 @@ class ProxySuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wi
val results = ProxySuspiciousConnectsAnalysis.validateSchema(data)
- results.length shouldBe 1
+ results.isValid shouldBe true
}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala b/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala
index 13611d1..52a3447 100644
--- a/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala
@@ -5,7 +5,7 @@ import org.scalatest.{FlatSpec, Matchers}
class InputSchemaTest extends FlatSpec with Matchers {
- "validate" should "return a Seq[String] of size 1 when incoming schema is valid" in {
+ "validate" should "return true when incoming schema is valid" in {
val incomingSchema = StructType(List(StructField("ip", StringType),
StructField("ibyt", LongType),
@@ -18,10 +18,10 @@ class InputSchemaTest extends FlatSpec with Matchers {
val results = InputSchema.validate(incomingSchema, modelSchema)
- results.length shouldBe 1
+ results.isValid shouldBe true
}
- it should "return a Seq[String] of size > 1 when incoming schema is not valid due to type mismatch" in {
+ it should "return false when incoming schema is not valid due to type mismatch" in {
val incomingSchema = StructType(List(StructField("ip", StringType),
StructField("ibyt", StringType),
StructField("host", IntegerType),
@@ -33,10 +33,10 @@ class InputSchemaTest extends FlatSpec with Matchers {
val results = InputSchema.validate(incomingSchema, modelSchema)
- results.length shouldBe 3
+ results.isValid shouldBe false
}
- it should "return a Seq[String] of size > 1 when incoming schema is not valid due to required field is missing" in {
+ it should "return false when incoming schema is not valid due to required field is missing" in {
val incomingSchema = StructType(List(StructField("ip", StringType),
StructField("ibyt", LongType),
@@ -48,6 +48,6 @@ class InputSchemaTest extends FlatSpec with Matchers {
val results = InputSchema.validate(incomingSchema, modelSchema)
- results.length shouldBe 2
+ results.isValid shouldBe false
}
}