You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/02/09 17:43:47 UTC
[2/7] incubator-spot git commit: Add options support to allow either
--input or --database, fixed and added tests.
Add options support to allow either --input or --database, fixed and added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/579be080
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/579be080
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/579be080
Branch: refs/heads/SPOT-181_ODM
Commit: 579be080f70352fd088a638b5cf09e6f52fb8890
Parents: 65755c2
Author: Curtis Howard <cu...@curtis-MBP.local>
Authored: Thu Jan 25 09:28:25 2018 -0500
Committer: Curtis Howard <cu...@curtis-MBP.local>
Committed: Thu Jan 25 09:28:25 2018 -0500
----------------------------------------------------------------------
.../org/apache/spot/SuspiciousConnects.scala | 46 ++++---
.../spot/SuspiciousConnectsArgumentParser.scala | 47 ++++---
.../utilities/data/InputOutputDataHandler.scala | 44 +++++-
.../SuspiciousConnectsArgumentParserTest.scala | 138 ++++++++++++++++++-
4 files changed, 226 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/579be080/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
index 32b7ca4..f47c3e9 100644
--- a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnects.scala
@@ -63,32 +63,30 @@ object SuspiciousConnects {
.enableHiveSupport()
.getOrCreate()
- /*
- val inputDataFrame = InputOutputDataHandler.getInputDataFrame(sparkSession, config.inputPath, logger)
- .getOrElse(sparkSession.emptyDataFrame)
- if(inputDataFrame.rdd.isEmpty()) {
- logger.error("Couldn't read data from location " + config.inputPath +", please verify it's a valid location and that " +
- s"contains parquet files with a given schema and try again.")
- System.exit(0)
- }
- */
-
- val hive_query = "SELECT * FROM " + config.database + "." + config.dataTable + " where (y=" + config.year + " and m=" + config.month + " and d=" + config.day + ")"
+ val inputQuery = "SELECT * FROM " + config.database + "." + config.dataTable + " where (y=" +
+ config.year + " and m=" + config.month + " and d=" + config.day + ")"
+
+ val inputDataFrame = if (config.database.trim.nonEmpty) {
+ InputOutputDataHandler.getInputDataFrame(sparkSession, inputQuery, logger, true)
+ .getOrElse(sparkSession.emptyDataFrame)
+ } else {
+ InputOutputDataHandler.getInputDataFrame(sparkSession, config.inputPath, logger, false)
+ .getOrElse(sparkSession.emptyDataFrame)
+ }
- val inputDataFrame = InputOutputDataHandler.getInputDataFrame(sparkSession, hive_query, logger)
- .getOrElse(sparkSession.emptyDataFrame)
if(inputDataFrame.rdd.isEmpty()) {
- logger.error("No records returned for Hive query " + hive_query +", please verify that data exists or issues with Hive connection.")
+ val dataFramePath = if (config.database.trim.nonEmpty) " query=(" + inputQuery + ")" else " path=(hdfs: " + config.inputPath + ")"
+ logger.error("No records returned for" + dataFramePath +", please verify that data and/or connectivity is available")
System.exit(0)
}
val results: Option[SuspiciousConnectsAnalysisResults] = analysis match {
- case "flow" => Some(FlowSuspiciousConnectsAnalysis.run(config, sparkSession, logger,
- inputDataFrame))
- case "dns" => Some(DNSSuspiciousConnectsAnalysis.run(config, sparkSession, logger,
- inputDataFrame))
- case "proxy" => Some(ProxySuspiciousConnectsAnalysis.run(config, sparkSession, logger,
- inputDataFrame))
+ case "flow" => FlowSuspiciousConnectsAnalysis.run(config, sparkSession, logger,
+ inputDataFrame)
+ case "dns" => DNSSuspiciousConnectsAnalysis.run(config, sparkSession, logger,
+ inputDataFrame)
+ case "proxy" => ProxySuspiciousConnectsAnalysis.run(config, sparkSession, logger,
+ inputDataFrame)
case _ => None
}
@@ -115,7 +113,11 @@ object SuspiciousConnects {
InvalidDataHandler.showAndSaveInvalidRecords(invalidRecords, config.hdfsScoredConnect, logger)
}
- case None => logger.error("Unsupported (or misspelled) analysis: " + analysis)
+ case None => logger.error(s"Something went wrong while trying to run Suspicious Connects Analysis")
+ logger.error(s"Is the value of the analysis parameter (provided: $analysis) any of the valid analysis " +
+ s"types flow/dns/proxy?")
+ logger.error("If analysis type is correct please check for other errors like schema not matching or " +
+ "bad spark parameters")
}
sparkSession.stop()
@@ -134,4 +136,4 @@ object SuspiciousConnects {
case class SuspiciousConnectsAnalysisResults(val suspiciousConnects: DataFrame, val invalidRecords: DataFrame)
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/579be080/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
index 7661cbe..49cbeab 100644
--- a/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/SuspiciousConnectsArgumentParser.scala
@@ -33,29 +33,29 @@ object SuspiciousConnectsArgumentParser {
action((x, c) => c.copy(analysis = x)).
text("choice of suspicious connections analysis to perform")
- opt[String]("input").required().valueName("<hdfs path>").
+ opt[String]("input").optional().valueName("<hdfs path>").
action((x, c) => c.copy(inputPath = x)).
text("HDFS path to input")
- opt[String]("database").required().valueName("<database name>").
+ opt[String]("database").optional().valueName("<database name>").
action((x, c) => c.copy(database = x)).
- text("Database name")
+ text("Database name").children(
+ opt[String]("datatable").required().valueName("<source table name>").
+ action((x, c) => c.copy(dataTable = x)).
+ text("hive table name"),
- opt[String]("datatable").required().valueName("<source table name>").
- action((x, c) => c.copy(dataTable = x)).
- text("hive table name")
+ opt[Int]("year").required().valueName("<input year>").
+ action((x, c) => c.copy(year = x)).
+ text("input year"),
- opt[String]("year").required().valueName("<input year>").
- action((x, c) => c.copy(year = x)).
- text("input year")
+ opt[Int]("month").required().valueName("<input month>").
+ action((x, c) => c.copy(month = x)).
+ text("input month"),
- opt[String]("month").required().valueName("<input month>").
- action((x, c) => c.copy(month = x)).
- text("input month")
-
- opt[String]("day").required().valueName("<input day>").
- action((x, c) => c.copy(day = x)).
- text("input day")
+ opt[Int]("day").required().valueName("<input day>").
+ action((x, c) => c.copy(day = x)).
+ text("input day")
+ )
opt[String]("feedback").valueName("<local file>").
action((x, c) => c.copy(feedbackFile = x)).
@@ -116,15 +116,24 @@ object SuspiciousConnectsArgumentParser {
opt[String]("ldaoptimizer").optional().valueName("lda optimizer").
action((x, c) => c.copy(ldaOptimizer = x)).
text("LDA Optimizer: em for EM Optimizer or online Online Optimizer")
+
+ checkConfig { params =>
+ if ((!params.inputPath.isEmpty && !params.database.isEmpty) ||
+ (params.inputPath.isEmpty && params.database.isEmpty)) {
+ failure(s"(input) and (database) options are mutually exclusive - please use one or the other.")
+ } else {
+ success
+ }
+ }
}
case class SuspiciousConnectsConfig(analysis: String = "",
inputPath: String = "",
database: String = "",
dataTable: String = "",
- year: String = "",
- month: String = "",
- day: String = "",
+ year: Int = 0,
+ month: Int = 0,
+ day: Int = 0,
feedbackFile: String = "",
duplicationFactor: Int = 1,
topicCount: Int = 20,
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/579be080/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala b/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
index d302d16..77fd79f 100644
--- a/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
+++ b/spot-ml/src/main/scala/org/apache/spot/utilities/data/InputOutputDataHandler.scala
@@ -17,10 +17,11 @@
package org.apache.spot.utilities.data
-import org.apache.hadoop.fs.{LocatedFileStatus, Path, RemoteIterator, FileUtil => fileUtil}
+import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator, FileUtil => fileUtil}
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SparkSession}
-
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
/**
* Handles input and output data for every data set or pipep line implementation.
@@ -31,15 +32,18 @@ object InputOutputDataHandler {
/**
*
* @param sparkSession Spark Session.
- * @param inputPath HDFS input folder for every execution; flow, dns or proxy.
+ * @param inputPath SQL expression to select relevant data for flow, dns or proxy OR hdfs input data path.
* @param logger Application logger.
+ * @param isHive whether or not inputPath is a Hive query (true) OR an HDFS input data path (false).
* @return raw data frame.
*/
- def getInputDataFrame(sparkSession: SparkSession, inputPath: String, logger: Logger): Option[DataFrame] = {
+ def getInputDataFrame(sparkSession: SparkSession, inputPath: String, logger: Logger, isHive: Boolean): Option[DataFrame] = {
try {
logger.info("Loading data from: " + inputPath)
- //Some(sparkSession.read.parquet(inputPath))
- Some(SparkSession.sql(inputPath))
+ if (isHive)
+ Some(sparkSession.sql(inputPath))
+ else
+ Some(sparkSession.read.parquet(inputPath))
} catch {
case _: Throwable => None
}
@@ -48,6 +52,34 @@ object InputOutputDataHandler {
/**
*
* @param sparkSession Spark Session.
+ * @param feedbackFile Feedback file location.
+ * @return new RDD[String] with feedback or empty if file does not exists.
+ */
+ def getFeedbackRDD(sparkSession: SparkSession, feedbackFile: String): RDD[String] = {
+
+ val hadoopConfiguration = sparkSession.sparkContext.hadoopConfiguration
+ val fs = FileSystem.get(hadoopConfiguration)
+
+ // We need to pass a default value "file" if fileName is "" to avoid error
+ // java.lang.IllegalArgumentException: Can not create a Path from an empty string
+ // when trying to create a new Path object with empty string.
+ val fileExists = fs.exists(new Path(if (feedbackFile == "") "file" else feedbackFile))
+
+ if (fileExists) {
+
+ // feedback file is a tab-separated file with a single header line. We need to remove the header
+ val lines = sparkSession.sparkContext.textFile(feedbackFile)
+ val header = lines.first()
+ lines.filter(line => line != header)
+
+ } else {
+ sparkSession.sparkContext.emptyRDD[String]
+ }
+ }
+
+ /**
+ *
+ * @param sparkSession Spark Session
* @param hdfsScoredConnect HDFS output folder. The location where results were saved; flow, dns or proxy.
* @param analysis Data type to analyze.
* @param logger Application Logger.
http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/579be080/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
----------------------------------------------------------------------
diff --git a/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala b/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
index f61b5ed..47aeea9 100644
--- a/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
+++ b/spot-ml/src/test/scala/org/apache/spot/SuspiciousConnectsArgumentParserTest.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spot
import org.apache.spot.SuspiciousConnectsArgumentParser.SuspiciousConnectsConfig
@@ -5,7 +22,50 @@ import org.scalatest.{FlatSpec, Matchers}
class SuspiciousConnectsArgumentParserTest extends FlatSpec with Matchers {
- "Argument parser" should "parse parameters correctly" in {
+ "Argument parser" should "parse (database/table input) parameters correctly" in {
+
+ val args = Array("--analysis", "dns",
+ "--database", "testdb",
+ "--datatable", "testtable",
+ "--year", "2017",
+ "--month", "11",
+ "--day", "1",
+ "--dupfactor", "1000",
+ "--feedback", "/home/user/ml/dns/test/dns_scores.csv",
+ "--ldatopiccount", "10",
+ "--scored", "/user/user/dns/test/scored_results/scores",
+ "--threshold", "1.1",
+ "--maxresults", "20",
+ "--ldamaxiterations", "20",
+ "--ldaalpha", "0.0009",
+ "--ldabeta", "0.00001",
+ "--ldaoptimizer", "online",
+ "--precision", "64",
+ "--userdomain", "mycompany")
+
+ val parser = SuspiciousConnectsArgumentParser.parser
+ val config = parser.parse(args, SuspiciousConnectsConfig()) match {
+ case Some(config) => config
+ case None => SuspiciousConnectsConfig()
+ }
+
+ config.analysis shouldBe "dns"
+ config.inputPath shouldBe ""
+ config.database shouldBe "testdb"
+ config.dataTable shouldBe "testtable"
+ config.year shouldBe 2017
+ config.month shouldBe 11
+ config.day shouldBe 1
+ config.duplicationFactor shouldBe 1000
+ config.topicCount shouldBe 10
+ config.ldaAlpha shouldBe 0.0009
+ config.ldaBeta shouldBe 0.00001
+ config.ldaOptimizer shouldBe "online"
+
+ }
+
+
+ "Argument parser" should "parse (HDFS Parquet input) parameters correctly" in {
val args = Array("--analysis", "dns",
"--input", "user/spot-data",
@@ -29,6 +89,8 @@ class SuspiciousConnectsArgumentParserTest extends FlatSpec with Matchers {
}
config.analysis shouldBe "dns"
+ config.inputPath shouldBe "user/spot-data"
+ config.database shouldBe ""
config.duplicationFactor shouldBe 1000
config.topicCount shouldBe 10
config.ldaAlpha shouldBe 0.0009
@@ -39,7 +101,11 @@ class SuspiciousConnectsArgumentParserTest extends FlatSpec with Matchers {
it should "parse parameters and take defaults when optional parameters are not provided" in {
val args = Array("--analysis", "dns",
- "--input", "user/spot-data",
+ "--database", "testdb",
+ "--datatable", "testtable",
+ "--year", "2017",
+ "--month", "11",
+ "--day", "1",
"--feedback", "/home/user/ml/dns/test/dns_scores.csv",
"--ldatopiccount", "10",
"--scored", "/user/user/dns/test/scored_results/scores",
@@ -54,6 +120,11 @@ class SuspiciousConnectsArgumentParserTest extends FlatSpec with Matchers {
}
config.analysis shouldBe "dns"
+ config.database shouldBe "testdb"
+ config.dataTable shouldBe "testtable"
+ config.year shouldBe 2017
+ config.month shouldBe 11
+ config.day shouldBe 1
config.duplicationFactor shouldBe 1
config.topicCount shouldBe 10
config.ldaAlpha shouldBe 1.02
@@ -61,4 +132,67 @@ class SuspiciousConnectsArgumentParserTest extends FlatSpec with Matchers {
config.ldaOptimizer shouldBe "em"
config.outputDelimiter shouldBe "\t"
}
+
+
+ "Argument parser" should "return failure if both input HDFS path and input database/table are provided" in {
+
+ val args = Array("--analysis", "dns",
+ "--input", "user/spot-data",
+ "--database", "testdb",
+ "--datatable", "testtable",
+ "--year", "2017",
+ "--month", "11",
+ "--day", "1",
+ "--dupfactor", "1000",
+ "--feedback", "/home/user/ml/dns/test/dns_scores.csv",
+ "--ldatopiccount", "10",
+ "--scored", "/user/user/dns/test/scored_results/scores",
+ "--threshold", "1.1",
+ "--maxresults", "20",
+ "--ldamaxiterations", "20",
+ "--ldaalpha", "0.0009",
+ "--ldabeta", "0.00001",
+ "--ldaoptimizer", "online",
+ "--precision", "64",
+ "--userdomain", "mycompany")
+
+ val parser = SuspiciousConnectsArgumentParser.parser
+ val config = parser.parse(args, SuspiciousConnectsConfig()) match {
+ case Some(config) => config
+ case None => SuspiciousConnectsConfig()
+ }
+
+ config.analysis shouldBe ""
+ }
+
+
+ "Argument parser" should "return failure if any input database/table parameters are missing" in {
+
+ val args = Array("--analysis", "dns",
+ "--input", "user/spot-data",
+ "--database", "testdb",
+ "--datatable", "testtable",
+ "--year", "2017",
+ "--day", "1",
+ "--dupfactor", "1000",
+ "--feedback", "/home/user/ml/dns/test/dns_scores.csv",
+ "--ldatopiccount", "10",
+ "--scored", "/user/user/dns/test/scored_results/scores",
+ "--threshold", "1.1",
+ "--maxresults", "20",
+ "--ldamaxiterations", "20",
+ "--ldaalpha", "0.0009",
+ "--ldabeta", "0.00001",
+ "--ldaoptimizer", "online",
+ "--precision", "64",
+ "--userdomain", "mycompany")
+
+ val parser = SuspiciousConnectsArgumentParser.parser
+ val config = parser.parse(args, SuspiciousConnectsConfig()) match {
+ case Some(config) => config
+ case None => SuspiciousConnectsConfig()
+ }
+
+ config.analysis shouldBe ""
+ }
}