You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2017/09/26 22:41:12 UTC

[09/50] [abbrv] incubator-spot git commit: SPOT-166 Schema validation: Made changes after code review from @NathanSegerlind - ValidateSchema will return case class with flag isValid and Seq[String] for a list of invalid columns. Changed flow, dns and pro

SPOT-166 Schema validation:
Made changes after code review from @NathanSegerlind
- ValidateSchema will return case class with flag isValid and Seq[String] for a list of invalid columns.
Changed flow, dns and proxy pipelines to handle validateSchema response InputSchemaValidationResponse
- Updated unit tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/3cab5ddd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/3cab5ddd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/3cab5ddd

Branch: refs/heads/SPOT-181_ODM
Commit: 3cab5ddd3987bf7ce1a24a6d4b67d597183f888e
Parents: 9c537a7
Author: Ricardo Barona <ri...@intel.com>
Authored: Fri Jun 30 13:40:42 2017 -0500
Committer: Ricardo Barona <ri...@intel.com>
Committed: Thu Jul 27 12:51:03 2017 -0500

----------------------------------------------------------------------
 .../spot/dns/DNSSuspiciousConnectsAnalysis.scala       |  9 +++++----
 .../spot/netflow/FlowSuspiciousConnectsAnalysis.scala  |  9 +++++----
 .../spot/proxy/ProxySuspiciousConnectsAnalysis.scala   |  9 +++++----
 .../spot/utilities/data/validation/InputSchema.scala   | 13 ++++++++-----
 .../spot/dns/DNSSuspiciousConnectsAnalysisTest.scala   |  8 ++++----
 .../netflow/FlowSuspiciousConnectsAnalysisTest.scala   |  8 ++++----
 .../proxy/ProxySuspiciousConnectsAnalysisTest.scala    |  8 ++++----
 .../utilities/data/validation/InputSchemaTest.scala    | 12 ++++++------
 8 files changed, 41 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
index e0c8068..acf8fc6 100644
--- a/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysis.scala
@@ -26,6 +26,7 @@ import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.dns.DNSSchema._
 import org.apache.spot.dns.model.DNSSuspiciousConnectsModel
 import org.apache.spot.proxy.ProxySchema.Score
+import org.apache.spot.utilities.data.validation.InputSchema.InputSchemaValidationResponse
 import org.apache.spot.utilities.data.validation.{InputSchema, InvalidDataHandler => dataValidation}
 
 /**
@@ -68,10 +69,10 @@ object DNSSuspiciousConnectsAnalysis {
     logger.info("Starting DNS suspicious connects analysis.")
 
     logger.info("Validating schema...")
-    val schemaValidationResults = validateSchema(inputDNSRecords)
+    val InputSchemaValidationResponse(isValid, errorMessages) = validateSchema(inputDNSRecords)
 
-    if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
-      schemaValidationResults.foreach(logger.error(_))
+    if (!isValid) {
+      errorMessages.foreach(logger.error(_))
       None
     } else {
       val dnsRecords = filterRecords(inputDNSRecords)
@@ -183,7 +184,7 @@ object DNSSuspiciousConnectsAnalysis {
     * @param inputDNSRecords incoming data frame
     * @return
     */
-  def validateSchema(inputDNSRecords: DataFrame): Seq[String] = {
+  def validateSchema(inputDNSRecords: DataFrame): InputSchemaValidationResponse = {
 
     InputSchema.validate(inputDNSRecords.schema, DNSSuspiciousConnectsModel.ModelSchema)
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
index a355cfb..b2d1304 100644
--- a/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysis.scala
@@ -25,6 +25,7 @@ import org.apache.spot.SuspiciousConnects.SuspiciousConnectsAnalysisResults
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.netflow.FlowSchema._
 import org.apache.spot.netflow.model.FlowSuspiciousConnectsModel
+import org.apache.spot.utilities.data.validation.InputSchema.InputSchemaValidationResponse
 import org.apache.spot.utilities.data.validation.{InputSchema, InvalidDataHandler => dataValidation}
 
 
@@ -88,10 +89,10 @@ object FlowSuspiciousConnectsAnalysis {
     logger.info("Starting flow suspicious connects analysis.")
 
     logger.info("Validating schema...")
-    val schemaValidationResults = validateSchema(inputFlowRecords)
+    val InputSchemaValidationResponse(isValid, errorMessages) = validateSchema(inputFlowRecords)
 
-    if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
-      schemaValidationResults.foreach(logger.error(_))
+    if (!isValid) {
+      errorMessages.foreach(logger.error(_))
       None
 
     } else {
@@ -191,7 +192,7 @@ object FlowSuspiciousConnectsAnalysis {
     * @param inputFlowRecords incoming data frame
     * @return
     */
-  def validateSchema(inputFlowRecords: DataFrame): Seq[String] = {
+  def validateSchema(inputFlowRecords: DataFrame): InputSchemaValidationResponse = {
 
     InputSchema.validate(inputFlowRecords.schema, FlowSuspiciousConnectsModel.ModelSchema)
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
index 0fa2fd2..ea0b90a 100644
--- a/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysis.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spot.SuspiciousConnects.SuspiciousConnectsAnalysisResults
 import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
 import org.apache.spot.proxy.ProxySchema._
+import org.apache.spot.utilities.data.validation.InputSchema.InputSchemaValidationResponse
 import org.apache.spot.utilities.data.validation.{InputSchema, InvalidDataHandler => dataValidation}
 
 /**
@@ -89,10 +90,10 @@ object ProxySuspiciousConnectsAnalysis {
     logger.info("Starting proxy suspicious connects analysis.")
 
     logger.info("Validating schema...")
-    val schemaValidationResults = validateSchema(inputProxyRecords)
+    val InputSchemaValidationResponse(isValid, errorMessages) = validateSchema(inputProxyRecords)
 
-    if (schemaValidationResults.length > InputSchema.ResponseDefaultSize) {
-      schemaValidationResults.foreach(logger.error(_))
+    if (!isValid) {
+      errorMessages.foreach(logger.error(_))
 
       None
     } else {
@@ -180,7 +181,7 @@ object ProxySuspiciousConnectsAnalysis {
     * @param inputProxyRecords incoming data frame
     * @return
     */
-  def validateSchema(inputProxyRecords: DataFrame): Seq[String] = {
+  def validateSchema(inputProxyRecords: DataFrame): InputSchemaValidationResponse = {
 
     InputSchema.validate(inputProxyRecords.schema, ProxySuspiciousConnectsModel.ModelSchema)
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala
index 6f0df78..997d992 100644
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/utilities/data/validation/InputSchema.scala
@@ -10,9 +10,6 @@ import scala.collection.mutable.ListBuffer
   */
 object InputSchema {
 
-  // Response from validate will contains always at least 1 element. 1 message means no errors.
-  val ResponseDefaultSize = 1
-
   /**
     * Validate the incoming data schema matches the schema required for model creation and scoring.
     *
@@ -20,7 +17,7 @@ object InputSchema {
     * @param expectedSchema schema expected by model training and scoring methods
     * @return
     */
-  def validate(inSchema: StructType, expectedSchema: StructType): Seq[String] = {
+  def validate(inSchema: StructType, expectedSchema: StructType): InputSchemaValidationResponse = {
     val response: ListBuffer[String] = ListBuffer("Schema not compatible:")
 
     // reduce schema from struct field to only field name and type
@@ -43,6 +40,12 @@ object InputSchema {
       }
       })
 
-    response
+    response.length match {
+      case 1 => InputSchemaValidationResponse(isValid = true, Seq())
+      case _ => InputSchemaValidationResponse(isValid = false, response)
+    }
   }
+
+  case class InputSchemaValidationResponse(final val isValid: Boolean, final val errorMessages: Seq[String])
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
index a940ea6..5aafbb2 100644
--- a/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/dns/DNSSuspiciousConnectsAnalysisTest.scala
@@ -342,23 +342,23 @@ class DNSSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec with
     scoredDNSRecords.count should be(2)
   }
 
-  "validateSchema" should "return a Seq[String] of size > 1 when passing an invalid schema" in {
+  "validateSchema" should "return false when passing an invalid schema" in {
     val anomalousRecord = DNSInvalidSchema("May 20 2016 02:10:25.970987000 PDT", 1463735425d, 1, "172.16.9.132",
       "122.2o7.turner.com", "0x00000001", "null", "null")
     val data = sparkSession.createDataFrame(Seq(anomalousRecord, anomalousRecord, anomalousRecord, anomalousRecord, anomalousRecord))
 
     val results = DNSSuspiciousConnectsAnalysis.validateSchema(data)
 
-    results.length should be > 1
+    results.isValid shouldBe false
   }
 
-  it should "return a Seq[String] of size 1 when passing a valid schema" in {
+  it should "return true when passing a valid schema" in {
     val typicalRecord = DNSInput("May 20 2016 02:10:25.970987000 PDT", 1463735425L, 168, "172.16.9.132", "122.2o7.turner.com", "0x00000001", 1, 0)
     val data = sparkSession.createDataFrame(Seq(typicalRecord, typicalRecord, typicalRecord, typicalRecord))
 
     val results = DNSSuspiciousConnectsAnalysis.validateSchema(data)
 
-    results.length shouldBe 1
+    results.isValid shouldBe true
   }
 
   def testDNSRecords = new {

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
index 7a15295..f7bdb12 100644
--- a/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/netflow/FlowSuspiciousConnectsAnalysisTest.scala
@@ -292,7 +292,7 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
     scoredFlowRecords.count should be(2)
   }
 
-  "validateSchema" should "return a Seq[String] of size > 1 when passing an invalid schema" in {
+  "validateSchema" should "return false when passing an invalid schema" in {
     val anomalousRecord = FlowInvalidRecord("2016-05-05 00:11:01", 2016, 5, 5, 0, 0, 1, 972, "172.16.0.129", "10.0" +
       ".2.202", 1024, 80, "TCP", "39l", "12522l", "0l", "0l")
 
@@ -300,10 +300,10 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
 
     val results = FlowSuspiciousConnectsAnalysis.validateSchema(data)
 
-    results.length should be > 1
+    results.isValid shouldBe false
   }
 
-  it should "return a Seq[String] of size 1 when passing a valid schema" in {
+  it should "return true when passing a valid schema" in {
     val typicalRecord = FlowRecord("2016-05-05 13:54:58", 2016, 5, 5, 13, 54, 58, 0.972f, "172.16.0.129", "10.0.2" +
       ".202", 1024, 80, "TCP", 39, 12522, 0, 0)
 
@@ -312,7 +312,7 @@ class FlowSuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wit
 
     val results = FlowSuspiciousConnectsAnalysis.validateSchema(data)
 
-
+    results.isValid shouldBe true
   }
 
   def testFlowRecords = new {

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala b/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
index 8ec75f2..d8e4061 100644
--- a/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/proxy/ProxySuspiciousConnectsAnalysisTest.scala
@@ -282,7 +282,7 @@ class ProxySuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wi
 
   }
 
-  "validateSchema" should "return a Seq[String] of size > 1 when passing an invalid schema" in {
+  "validateSchema" should "return false when passing an invalid schema" in {
     val anomalousRecord = ProxyInvalidInput("2016-10-03", "04:57:36", "127.0.0.1", 0.0d, "PUT",
       "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36",
       1, 230, "-", "Technology/Internet", "http://www.spoonflower.com/tags/color", "202", 80,
@@ -295,11 +295,11 @@ class ProxySuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wi
 
     val results = ProxySuspiciousConnectsAnalysis.validateSchema(data)
 
-    results.length should be > 1
+    results.isValid shouldBe false
 
   }
 
-  it should "return a Seq[String] of size 1 when passing a valid schema" in {
+  it should "return true when passing a valid schema" in {
     val typicalRecord = ProxyInput("2016-10-03", "04:57:36", "127.0.0.1", "maw.bronto.com", "PUT",
       "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36",
       "text/plain", 230, "-", "Technology/Internet", "http://www.spoonflower.com/tags/color", "202", 80,
@@ -311,7 +311,7 @@ class ProxySuspiciousConnectsAnalysisTest extends TestingSparkContextFlatSpec wi
 
     val results = ProxySuspiciousConnectsAnalysis.validateSchema(data)
 
-    results.length shouldBe 1
+    results.isValid shouldBe true
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3cab5ddd/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala b/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala
index 13611d1..52a3447 100644
--- a/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/utilities/data/validation/InputSchemaTest.scala
@@ -5,7 +5,7 @@ import org.scalatest.{FlatSpec, Matchers}
 
 class InputSchemaTest extends FlatSpec with Matchers {
 
-  "validate" should "return a Seq[String] of size 1 when incoming schema is valid" in {
+  "validate" should "return true when incoming schema is valid" in {
 
     val incomingSchema = StructType(List(StructField("ip", StringType),
       StructField("ibyt", LongType),
@@ -18,10 +18,10 @@ class InputSchemaTest extends FlatSpec with Matchers {
 
     val results = InputSchema.validate(incomingSchema, modelSchema)
 
-    results.length shouldBe 1
+    results.isValid shouldBe true
   }
 
-  it should "return a Seq[String] of size > 1 when incoming schema is not valid due to type mismatch" in {
+  it should "return false when incoming schema is not valid due to type mismatch" in {
     val incomingSchema = StructType(List(StructField("ip", StringType),
       StructField("ibyt", StringType),
       StructField("host", IntegerType),
@@ -33,10 +33,10 @@ class InputSchemaTest extends FlatSpec with Matchers {
 
     val results = InputSchema.validate(incomingSchema, modelSchema)
 
-    results.length shouldBe 3
+    results.isValid shouldBe false
   }
 
-  it should "return a Seq[String] of size > 1 when incoming schema is not valid due to required field is missing" in {
+  it should "return false when incoming schema is not valid due to required field is missing" in {
 
     val incomingSchema = StructType(List(StructField("ip", StringType),
       StructField("ibyt", LongType),
@@ -48,6 +48,6 @@ class InputSchemaTest extends FlatSpec with Matchers {
 
     val results = InputSchema.validate(incomingSchema, modelSchema)
 
-    results.length shouldBe 2
+    results.isValid shouldBe false
   }
 }