You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spot.apache.org by na...@apache.org on 2018/03/19 15:12:53 UTC

[01/13] incubator-spot git commit: [SPOT-213] [OA] add kerberos requirements

Repository: incubator-spot
Updated Branches:
  refs/heads/master f722127ca -> 14dbd511d


[SPOT-213] [OA] add kerberos requirements


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/7376c5e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/7376c5e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/7376c5e4

Branch: refs/heads/master
Commit: 7376c5e4ef186365dd19581ffefc2cfe015b7529
Parents: 6deaae3
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 10:48:07 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 10:48:07 2018 -0800

----------------------------------------------------------------------
 spot-oa/kerberos-requirements.txt | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/7376c5e4/spot-oa/kerberos-requirements.txt
----------------------------------------------------------------------
diff --git a/spot-oa/kerberos-requirements.txt b/spot-oa/kerberos-requirements.txt
new file mode 100644
index 0000000..ee4cae4
--- /dev/null
+++ b/spot-oa/kerberos-requirements.txt
@@ -0,0 +1,3 @@
+thrift_sasl==0.2.1
+sasl
+requests-kerberos
\ No newline at end of file


[07/13] incubator-spot git commit: [SPOT-213][SPOT-216] [setup] updated scripts, documentation and spot.conf to support mutiple DB engines

Posted by na...@apache.org.
[SPOT-213][SPOT-216] [setup] updated scripts, documentation and spot.conf to support mutiple DB engines


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/49f4934c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/49f4934c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/49f4934c

Branch: refs/heads/master
Commit: 49f4934c47e32ccda80111025ececf9e53780f11
Parents: 3383c07
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 12:32:09 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 12:32:09 2018 -0800

----------------------------------------------------------------------
 spot-setup/README.md     |   7 +++
 spot-setup/hdfs_setup.sh | 120 +++++++++++++++++++++++++++++++++++++-----
 spot-setup/spot.conf     |  28 +++++++++-
 3 files changed, 139 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/49f4934c/spot-setup/README.md
----------------------------------------------------------------------
diff --git a/spot-setup/README.md b/spot-setup/README.md
index 1d486a6..c5d245a 100644
--- a/spot-setup/README.md
+++ b/spot-setup/README.md
@@ -21,6 +21,11 @@ To collaborate and run spot-setup, it is required the following prerequisites:
 
 The main script in the repository is **hdfs_setup.sh** which is responsible of loading environment variables, creating folders in Hadoop for the different use cases (flow, DNS or Proxy), create the Impala database, and finally execute Impala query scripts that creates Impala tables needed to access netflow, dns and proxy data.
 
+Options:
+--no-sudo     will execute commands as the existing user while setting `HADOOP_USER_NAME=hdfs`
+-c            specify a custom location for the spot.conf, defaults to /etc/spot.conf
+-d            specific which database client to use `-d beeline` NOTE: Impala supports kerberos
+
 ## Environment Variables
 
 **spot.conf** is the file storing the variables needed during the installation process including node assignment, User interface, Machine Learning and Ingest gateway nodes.
@@ -33,6 +38,8 @@ To read more about these variables, please review the [documentation](http://spo
 
 spot-setup contains a script per use case, as of today, there is a table creation script for each DNS, flow and Proxy data.
 
+the HQL scripts are seperated by the underlying database in the ./spot-setup/ folder.
+
 These HQL scripts are intended to be executed as a Impala statement and must comply HQL standards.
 
 We create tables using Parquet format to get a faster query performance. This format is an industry standard and you can find more information about it on:

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/49f4934c/spot-setup/hdfs_setup.sh
----------------------------------------------------------------------
diff --git a/spot-setup/hdfs_setup.sh b/spot-setup/hdfs_setup.sh
index df898c8..6e73a20 100755
--- a/spot-setup/hdfs_setup.sh
+++ b/spot-setup/hdfs_setup.sh
@@ -17,6 +17,27 @@
 # limitations under the License.
 #
 
+set -e
+
+function log() {
+printf "hdfs_setup.sh:\n $1\n"
+}
+
+function safe_mkdir() {
+        # takes the hdfs command options and a directory
+        # checks for the directory before trying to create it
+        # keeps the script from existing on existing folders
+        local hdfs_cmd=$1
+        local dir=$2
+        if $(hdfs dfs -test -d ${dir}); then
+            log "${dir} already exists"
+        else
+            log "running mkdir on ${dir}"
+            ${hdfs_cmd} dfs -mkdir ${dir}
+        fi
+}
+
+SPOTCONF="/etc/spot.conf"
 DSOURCES=('flow' 'dns' 'proxy')
 DFOLDERS=('binary' 
 'stage'
@@ -33,37 +54,108 @@ DFOLDERS=('binary'
 'hive/oa/threat_dendro'
 )
 
+
+# input options
+for arg in "$@"; do
+    case $arg in
+        "--no-sudo")
+            log "not using sudo"
+            no_sudo=true
+            shift
+            ;;
+        "-c")
+            shift
+            SPOTCONF=$1
+            log "Spot Configuration file: ${SPOTCONF}"
+            shift
+            ;;
+        "-d")
+            shift
+            db_override=$1
+            shift
+            ;;
+    esac
+done
+
 # Sourcing spot configuration variables
-source /etc/spot.conf
+log "Sourcing ${SPOTCONF}\n"
+source $SPOTCONF
+
+if [[ ${no_sudo} == "true" ]]; then
+    hdfs_cmd="hdfs"
+
+    if [[ ! -z "${HADOOP_USER_NAME}" ]]; then
+        log "HADOOP_USER_NAME: ${HADOOP_USER_NAME}"
+    else
+        log "setting HADOOP_USER_NAME to hdfs"
+        HADOOP_USER_NAME=hdfs
+    fi
+else
+    hdfs_cmd="sudo -u hdfs hdfs"
+fi
+
+if [[ -z "${db_override}" ]]; then
+        DBENGINE=$(echo ${DBENGINE} | tr '[:upper:]' '[:lower:]')
+        log "setting database engine to ${DBENGINE}"
+else
+        DBENGINE=$(echo ${db_override} | tr '[:upper:]' '[:lower:]')
+        log "setting database engine to $db_override"
+fi
+
+case ${DBENGINE} in
+    impala)
+        db_shell="impala-shell -i ${IMPALA_DEM}"
+        if [[ ${KERBEROS} == "true" ]]; then
+            db_shell="${db_shell} -k"
+        fi
+        db_query="${db_shell} -q"
+        db_script="${db_shell} --var=huser=${HUSER} --var=dbname=${DBNAME} -c -f"
+        ;;
+    hive)
+        db_shell="hive"
+        db_query="${db_shell} -e"
+        db_script="${db_shell} -hiveconf huser=${HUSER} -hiveconf dbname=${DBNAME} -f"
+        ;;
+    beeline)
+        db_shell="beeline -u jdbc:${JDBC_URL}"
+        db_query="${db_shell} -e"
+        db_script="${db_shell} --hivevar huser=${HUSER} --hivevar dbname=${DBNAME} -f"
+        ;;
+    *)
+        log "DBENGINE not compatible or not set in spot.conf: DBENGINE--> ${DBENGINE:-empty}"
+        exit 1
+        ;;
+esac
 
 # Creating HDFS user's folder
-sudo -u hdfs hdfs dfs -mkdir ${HUSER}
-sudo -u hdfs hdfs dfs -chown ${USER}:supergroup ${HUSER}
-sudo -u hdfs hdfs dfs -chmod 775 ${HUSER}
+safe_mkdir ${hdfs_cmd} ${HUSER}
+${hdfs_cmd} dfs -chown ${USER}:supergroup ${HUSER}
+${hdfs_cmd} dfs -chmod 775 ${HUSER}
 
 # Creating HDFS paths for each use case
 for d in "${DSOURCES[@]}" 
-do 
+do
 	echo "creating /$d"
-	hdfs dfs -mkdir ${HUSER}/$d 
+	safe_mkdir hdfs ${HUSER}/$d
 	for f in "${DFOLDERS[@]}" 
 	do 
 		echo "creating $d/$f"
-		hdfs dfs -mkdir ${HUSER}/$d/$f
+		safe_mkdir ${hdfs_cmd} ${HUSER}/$d/$f
 	done
 
 	# Modifying permission on HDFS folders to allow Impala to read/write
 	hdfs dfs -chmod -R 775 ${HUSER}/$d
-	sudo -u hdfs hdfs dfs -setfacl -R -m user:impala:rwx ${HUSER}/$d
-	sudo -u hdfs hdfs dfs -setfacl -R -m user:${USER}:rwx ${HUSER}/$d
+	${hdfs_cmd} dfs -setfacl -R -m user:${db_override}:rwx ${HUSER}/$d
+	${hdfs_cmd} dfs -setfacl -R -m user:${USER}:rwx ${HUSER}/$d
 done
 
+
 # Creating Spot Database
-impala-shell -i ${IMPALA_DEM} -q "CREATE DATABASE IF NOT EXISTS ${DBNAME};"
+ ${db_query} "CREATE DATABASE IF NOT EXISTS ${DBNAME}";
+
 
-# Creating Impala tables
+# Creating tables
 for d in "${DSOURCES[@]}" 
-do 
-	impala-shell -i ${IMPALA_DEM} --var=huser=${HUSER} --var=dbname=${DBNAME} -c -f create_${d}_parquet.hql
+do
+	${db_script} "./${DBENGINE}/create_${d}_parquet.hql"
 done
-

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/49f4934c/spot-setup/spot.conf
----------------------------------------------------------------------
diff --git a/spot-setup/spot.conf b/spot-setup/spot.conf
index a0cba3d..aa08ea7 100755
--- a/spot-setup/spot.conf
+++ b/spot-setup/spot.conf
@@ -19,7 +19,6 @@
 UINODE='node03'
 MLNODE='node04'
 GWNODE='node16'
-DBNAME='spot'
 
 #hdfs - base user and data source config
 HUSER='/user/spot'
@@ -30,10 +29,35 @@ PROXY_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/
 FLOW_PATH=${HUSER}/${DSOURCE}/hive/y=${YR}/m=${MH}/d=${DY}/
 HPATH=${HUSER}/${DSOURCE}/scored_results/${FDATE}
 
-#impala config
+# Database
+DBNAME='spot'
+DBENGINE="" # hive,impala and beeline supported
+JDBC_URL="" # example hive2://node01:10000/default;principal=hive/node01@REALM.COM
+
+# impala config
 IMPALA_DEM=node04
 IMPALA_PORT=21050
 
+# Hive Server2
+HS2_HOST=''
+HS2_PORT=''
+
+#kerberos config
+KERBEROS='false'
+KINIT=/usr/bin/kinit
+PRINCIPAL='user'
+KEYTAB='/opt/security/user.keytab'
+SASL_MECH='GSSAPI'
+SECURITY_PROTO='sasl_plaintext'
+KAFKA_SERVICE_NAME=''
+
+#ssl config
+SSL='false'
+SSL_VERIFY='true'
+CA_LOCATION=''
+CERT=''
+KEY=''
+
 #local fs base user and data source config
 LUSER='/home/spot'
 LPATH=${LUSER}/ml/${DSOURCE}/${FDATE}


[06/13] incubator-spot git commit: [SPOT-213][SPOT-216] [setup] moved script files to support additional engines such as beeline, impala

Posted by na...@apache.org.
[SPOT-213][SPOT-216] [setup] moved script files to support additional engines such as beeline, impala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/3383c07c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/3383c07c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/3383c07c

Branch: refs/heads/master
Commit: 3383c07cbaf695953facdc3c269c01af992abaae
Parents: 8b600c8
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 12:23:24 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 12:23:24 2018 -0800

----------------------------------------------------------------------
 spot-setup/beeline/create_dns_parquet.hql   | 162 +++++++++++++++++++
 spot-setup/beeline/create_flow_parquet.hql  | 194 ++++++++++++++++++++++
 spot-setup/beeline/create_proxy_parquet.hql | 179 ++++++++++++++++++++
 spot-setup/create_dns_parquet.hql           | 163 -------------------
 spot-setup/create_flow_parquet.hql          | 195 ----------------------
 spot-setup/create_proxy_parquet.hql         | 177 --------------------
 spot-setup/hive/create_dns_parquet.hql      | 165 +++++++++++++++++++
 spot-setup/hive/create_flow_parquet.hql     | 197 +++++++++++++++++++++++
 spot-setup/hive/create_proxy_parquet.hql    | 179 ++++++++++++++++++++
 spot-setup/impala/create_dns_parquet.hql    | 163 +++++++++++++++++++
 spot-setup/impala/create_flow_parquet.hql   | 195 ++++++++++++++++++++++
 spot-setup/impala/create_proxy_parquet.hql  | 177 ++++++++++++++++++++
 12 files changed, 1611 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/beeline/create_dns_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/beeline/create_dns_parquet.hql b/spot-setup/beeline/create_dns_parquet.hql
new file mode 100755
index 0000000..b9be108
--- /dev/null
+++ b/spot-setup/beeline/create_dns_parquet.hql
@@ -0,0 +1,162 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns (
+frame_time STRING, 
+unix_tstamp BIGINT,
+frame_len INT,
+ip_dst STRING,
+ip_src STRING,
+dns_qry_name STRING,
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT,
+dns_a STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT,
+h TINYINT
+)
+STORED AS PARQUET 
+LOCATION '${huser}/dns/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_dendro (
+unix_tstamp BIGINT,
+dns_a STRING,
+dns_qry_name STRING,
+ip_dst STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/dns/hive/oa/dendro';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_edge (
+unix_tstamp BIGINT,
+frame_len BIGINT,
+ip_dst STRING,
+ip_src STRING,
+dns_qry_name STRING,
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT,
+dns_a STRING,
+hh INT,
+dns_qry_class_name STRING,
+dns_qry_type_name STRING,
+dns_qry_rcode_name STRING,
+network_context STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/dns/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_ingest_summary (
+tdate STRING,
+total BIGINT
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/dns/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_scores (
+frame_time STRING, 
+unix_tstamp BIGINT,
+frame_len BIGINT,
+ip_dst STRING, 
+dns_qry_name STRING, 
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT, 
+ml_score FLOAT,
+tld STRING,
+query_rep STRING,
+hh INT,
+dns_qry_class_name STRING, 
+dns_qry_type_name STRING,
+dns_qry_rcode_name STRING, 
+network_context STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/dns/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_storyboard (
+ip_threat STRING,
+dns_threat STRING, 
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/dns/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_threat_dendro (
+anchor STRING, 
+total BIGINT,
+dns_qry_name STRING, 
+ip_dst STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/dns/hive/oa/threat_dendro';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.dns_threat_investigation (
+unix_tstamp BIGINT,
+ip_dst STRING, 
+dns_qry_name STRING, 
+ip_sev INT,
+dns_sev INT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/dns/hive/oa/threat_investigation';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/beeline/create_flow_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/beeline/create_flow_parquet.hql b/spot-setup/beeline/create_flow_parquet.hql
new file mode 100755
index 0000000..25e860a
--- /dev/null
+++ b/spot-setup/beeline/create_flow_parquet.hql
@@ -0,0 +1,194 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow (
+treceived STRING,
+unix_tstamp BIGINT,
+tryear INT,
+trmonth INT,
+trday INT,
+trhour INT,
+trminute INT,
+trsec INT,
+tdur FLOAT,
+sip STRING,
+dip STRING,
+sport INT,
+dport INT,
+proto STRING,
+flag STRING,
+fwd INT,
+stos INT,
+ipkt BIGINT,
+ibyt BIGINT,
+opkt BIGINT, 
+obyt BIGINT,
+input INT,
+output INT,
+sas INT,
+das INT,
+dtos INT,
+dir INT,
+rip STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT,
+h TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/flow/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_chords (
+ip_threat STRING,
+srcip STRING,
+dstip STRING,
+ibyt BIGINT, 
+ipkt BIGINT
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/flow/hive/oa/chords';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_edge (
+tstart STRING, 
+srcip STRING,
+dstip STRING,
+sport INT, 
+dport INT, 
+proto STRING,
+flags STRING,
+tos INT, 
+ibyt BIGINT, 
+ipkt BIGINT, 
+input BIGINT,
+output BIGINT, 
+rip STRING,
+obyt BIGINT, 
+opkt BIGINT, 
+hh INT,
+mn INT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/flow/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_ingest_summary (
+tdate STRING,
+total BIGINT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/flow/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_scores (
+tstart STRING, 
+srcip STRING,
+dstip STRING,
+sport INT, 
+dport INT, 
+proto STRING,
+ipkt INT,
+ibyt INT,
+opkt INT,
+obyt INT,
+ml_score FLOAT,
+rank INT,
+srcip_INTernal INT,
+dstip_INTernal INT,
+src_geoloc STRING, 
+dst_geoloc STRING, 
+src_domain STRING, 
+dst_domain STRING, 
+src_rep STRING,
+dst_rep STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/flow/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_storyboard (
+ip_threat STRING,
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/flow/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_threat_investigation (
+tstart STRING,
+srcip STRING, 
+dstip STRING, 
+srcport INT,
+dstport INT,
+score INT 
+) 
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+) 
+STORED AS PARQUET 
+LOCATION '${huser}/flow/hive/oa/threat_investigation';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.flow_timeline (
+ip_threat STRING,
+tstart STRING, 
+tend STRING, 
+srcip STRING,
+dstip STRING,
+proto STRING,
+sport INT, 
+dport INT, 
+ipkt BIGINT, 
+ibyt BIGINT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/flow/hive/oa/timeline';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/beeline/create_proxy_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/beeline/create_proxy_parquet.hql b/spot-setup/beeline/create_proxy_parquet.hql
new file mode 100755
index 0000000..d9cd79f
--- /dev/null
+++ b/spot-setup/beeline/create_proxy_parquet.hql
@@ -0,0 +1,179 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SET huser;
+SET dbname;
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy (
+p_date STRING,
+p_time STRING,
+clientip STRING,
+host STRING,
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+duration INT,
+username STRING,
+authgroup STRING,
+exceptionid STRING,
+filterresult STRING,
+webcat STRING,
+referer STRING,
+respcode STRING,
+action STRING,
+urischeme STRING,
+uriport STRING,
+uripath STRING,
+uriquery STRING,
+uriextension STRING,
+serverip STRING,
+scbytes INT,
+csbytes INT,
+virusid STRING,
+bcappname STRING,
+bcappoper STRING,
+fulluri STRING
+)
+PARTITIONED BY (
+y STRING,
+m STRING,
+d STRING,
+h STRING
+)
+STORED AS PARQUET
+LOCATION '${huser}/proxy/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_edge (
+tdate STRING,
+time STRING, 
+clientip STRING, 
+host STRING, 
+webcat STRING, 
+respcode STRING, 
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+referer STRING,
+uriport STRING,
+serverip STRING, 
+scbytes INT, 
+csbytes INT, 
+fulluri STRING,
+hh INT,
+respcode_name STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/proxy/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_ingest_summary (
+tdate STRING,
+total BIGINT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/proxy/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_scores (
+tdate STRING,
+time STRING, 
+clientip STRING, 
+host STRING, 
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+duration INT,
+username STRING, 
+webcat STRING, 
+referer STRING,
+respcode INT,
+uriport INT, 
+uripath STRING,
+uriquery STRING, 
+serverip STRING, 
+scbytes INT, 
+csbytes INT, 
+fulluri STRING,
+word STRING, 
+ml_score FLOAT,
+uri_rep STRING,
+respcode_name STRING,
+network_context STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/proxy/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_storyboard (
+p_threat STRING, 
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/proxy/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_threat_investigation (
+tdate STRING,
+fulluri STRING,
+uri_sev INT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/proxy/hive/oa/threat_investigation';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${dbname}.proxy_timeline (
+p_threat STRING, 
+tstart STRING, 
+tend STRING, 
+duration BIGINT, 
+clientip STRING, 
+respcode STRING, 
+respcodename STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${huser}/proxy/hive/oa/timeline';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/create_dns_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/create_dns_parquet.hql b/spot-setup/create_dns_parquet.hql
deleted file mode 100755
index 38025c6..0000000
--- a/spot-setup/create_dns_parquet.hql
+++ /dev/null
@@ -1,163 +0,0 @@
-
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements.  See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License.  You may obtain a copy of the License at
-
---    http://www.apache.org/licenses/LICENSE-2.0
-
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
-
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns (
-frame_time STRING, 
-unix_tstamp BIGINT,
-frame_len INT,
-ip_dst STRING,
-ip_src STRING,
-dns_qry_name STRING,
-dns_qry_class STRING,
-dns_qry_type INT,
-dns_qry_rcode INT,
-dns_a STRING
-)
-PARTITIONED BY (
-y SMALLINT,
-m TINYINT,
-d TINYINT,
-h TINYINT
-)
-STORED AS PARQUET 
-LOCATION '${var:huser}/dns/hive';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.dns_dendro (
-unix_tstamp BIGINT,
-dns_a STRING,
-dns_qry_name STRING,
-ip_dst STRING
-)
-PARTITIONED BY (
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/dendro';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.dns_edge ( 
-unix_tstamp BIGINT,
-frame_len BIGINT,
-ip_dst STRING,
-ip_src STRING,
-dns_qry_name STRING,
-dns_qry_class STRING,
-dns_qry_type INT,
-dns_qry_rcode INT,
-dns_a STRING,
-hh INT,
-dns_qry_class_name STRING,
-dns_qry_type_name STRING,
-dns_qry_rcode_name STRING,
-network_context STRING
-)
-PARTITIONED BY (
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/edge';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.dns_ingest_summary ( 
-tdate STRING,
-total BIGINT
-)
-PARTITIONED BY (
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/summary';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.dns_scores ( 
-frame_time STRING, 
-unix_tstamp BIGINT,
-frame_len BIGINT,
-ip_dst STRING, 
-dns_qry_name STRING, 
-dns_qry_class STRING,
-dns_qry_type INT,
-dns_qry_rcode INT, 
-ml_score FLOAT,
-tld STRING,
-query_rep STRING,
-hh INT,
-dns_qry_class_name STRING, 
-dns_qry_type_name STRING,
-dns_qry_rcode_name STRING, 
-network_context STRING 
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/suspicious';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.dns_storyboard ( 
-ip_threat STRING,
-dns_threat STRING, 
-title STRING,
-text STRING
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/storyboard';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.dns_threat_dendro (
-anchor STRING, 
-total BIGINT,
-dns_qry_name STRING, 
-ip_dst STRING
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/threat_dendro';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.dns_threat_investigation ( 
-unix_tstamp BIGINT,
-ip_dst STRING, 
-dns_qry_name STRING, 
-ip_sev INT,
-dns_sev INT
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/dns/hive/oa/threat_investigation';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/create_flow_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/create_flow_parquet.hql b/spot-setup/create_flow_parquet.hql
deleted file mode 100755
index 41c4819..0000000
--- a/spot-setup/create_flow_parquet.hql
+++ /dev/null
@@ -1,195 +0,0 @@
-
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements.  See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License.  You may obtain a copy of the License at
-
---    http://www.apache.org/licenses/LICENSE-2.0
-
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
-
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow (
-treceived STRING,
-unix_tstamp BIGINT,
-tryear INT,
-trmonth INT,
-trday INT,
-trhour INT,
-trminute INT,
-trsec INT,
-tdur FLOAT,
-sip STRING,
-dip STRING,
-sport INT,
-dport INT,
-proto STRING,
-flag STRING,
-fwd INT,
-stos INT,
-ipkt BIGINT,
-ibyt BIGINT,
-opkt BIGINT, 
-obyt BIGINT,
-input INT,
-output INT,
-sas INT,
-das INT,
-dtos INT,
-dir INT,
-rip STRING
-)
-PARTITIONED BY (
-y SMALLINT,
-m TINYINT,
-d TINYINT,
-h TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.flow_chords (
-ip_threat STRING,
-srcip STRING,
-dstip STRING,
-ibyt BIGINT, 
-ipkt BIGINT
-)
-PARTITIONED BY (
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/chords';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.flow_edge (
-tstart STRING, 
-srcip STRING,
-dstip STRING,
-sport INT, 
-dport INT, 
-proto STRING,
-flags STRING,
-tos INT, 
-ibyt BIGINT, 
-ipkt BIGINT, 
-input BIGINT,
-output BIGINT, 
-rip STRING,
-obyt BIGINT, 
-opkt BIGINT, 
-hh INT,
-mn INT 
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/edge';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.flow_ingest_summary (
-tdate STRING,
-total BIGINT 
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/summary';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.flow_scores (
-tstart STRING, 
-srcip STRING,
-dstip STRING,
-sport INT, 
-dport INT, 
-proto STRING,
-ipkt INT,
-ibyt INT,
-opkt INT,
-obyt INT,
-ml_score FLOAT,
-rank INT,
-srcip_INTernal INT,
-dstip_INTernal INT,
-src_geoloc STRING, 
-dst_geoloc STRING, 
-src_domain STRING, 
-dst_domain STRING, 
-src_rep STRING,
-dst_rep STRING 
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/suspicious';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.flow_storyboard (
-ip_threat STRING,
-title STRING,
-text STRING
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/storyboard';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.flow_threat_investigation ( 
-tstart STRING,
-srcip STRING, 
-dstip STRING, 
-srcport INT,
-dstport INT,
-score INT 
-) 
-PARTITIONED BY (
-y SMALLINT,
-m TINYINT,
-d TINYINT
-) 
-STORED AS PARQUET 
-LOCATION '${var:huser}/flow/hive/oa/threat_investigation';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.flow_timeline (
-ip_threat STRING,
-tstart STRING, 
-tend STRING, 
-srcip STRING,
-dstip STRING,
-proto STRING,
-sport INT, 
-dport INT, 
-ipkt BIGINT, 
-ibyt BIGINT
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/flow/hive/oa/timeline';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/create_proxy_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/create_proxy_parquet.hql b/spot-setup/create_proxy_parquet.hql
deleted file mode 100755
index f665dc2..0000000
--- a/spot-setup/create_proxy_parquet.hql
+++ /dev/null
@@ -1,177 +0,0 @@
-
--- Licensed to the Apache Software Foundation (ASF) under one or more
--- contributor license agreements.  See the NOTICE file distributed with
--- this work for additional information regarding copyright ownership.
--- The ASF licenses this file to You under the Apache License, Version 2.0
--- (the "License"); you may not use this file except in compliance with
--- the License.  You may obtain a copy of the License at
-
---    http://www.apache.org/licenses/LICENSE-2.0
-
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
-
-CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy (
-p_date STRING,
-p_time STRING,
-clientip STRING,
-host STRING,
-reqmethod STRING,
-useragent STRING,
-resconttype STRING,
-duration INT,
-username STRING,
-authgroup STRING,
-exceptionid STRING,
-filterresult STRING,
-webcat STRING,
-referer STRING,
-respcode STRING,
-action STRING,
-urischeme STRING,
-uriport STRING,
-uripath STRING,
-uriquery STRING,
-uriextension STRING,
-serverip STRING,
-scbytes INT,
-csbytes INT,
-virusid STRING,
-bcappname STRING,
-bcappoper STRING,
-fulluri STRING
-)
-PARTITIONED BY (
-y STRING,
-m STRING,
-d STRING,
-h STRING
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_edge ( 
-tdate STRING,
-time STRING, 
-clientip STRING, 
-host STRING, 
-webcat STRING, 
-respcode STRING, 
-reqmethod STRING,
-useragent STRING,
-resconttype STRING,
-referer STRING,
-uriport STRING,
-serverip STRING, 
-scbytes INT, 
-csbytes INT, 
-fulluri STRING,
-hh INT,
-respcode_name STRING 
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/edge';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_ingest_summary ( 
-tdate STRING,
-total BIGINT 
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/summary';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_scores ( 
-tdate STRING,
-time STRING, 
-clientip STRING, 
-host STRING, 
-reqmethod STRING,
-useragent STRING,
-resconttype STRING,
-duration INT,
-username STRING, 
-webcat STRING, 
-referer STRING,
-respcode INT,
-uriport INT, 
-uripath STRING,
-uriquery STRING, 
-serverip STRING, 
-scbytes INT, 
-csbytes INT, 
-fulluri STRING,
-word STRING, 
-ml_score FLOAT,
-uri_rep STRING,
-respcode_name STRING,
-network_context STRING 
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/suspicious';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_storyboard ( 
-p_threat STRING, 
-title STRING,
-text STRING
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/storyboard';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_threat_investigation ( 
-tdate STRING,
-fulluri STRING,
-uri_sev INT
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/threat_investigation';
-
-
-CREATE EXTERNAL TABLE ${var:dbname}.proxy_timeline ( 
-p_threat STRING, 
-tstart STRING, 
-tend STRING, 
-duration BIGINT, 
-clientip STRING, 
-respcode STRING, 
-respcodename STRING
-)
-PARTITIONED BY ( 
-y SMALLINT,
-m TINYINT,
-d TINYINT
-)
-STORED AS PARQUET
-LOCATION '${var:huser}/proxy/hive/oa/timeline';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/hive/create_dns_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/hive/create_dns_parquet.hql b/spot-setup/hive/create_dns_parquet.hql
new file mode 100755
index 0000000..8e31ed3
--- /dev/null
+++ b/spot-setup/hive/create_dns_parquet.hql
@@ -0,0 +1,165 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SET hiveconf:huser;
+SET hiveconf:dbname;
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns (
+frame_time STRING, 
+unix_tstamp BIGINT,
+frame_len INT,
+ip_dst STRING,
+ip_src STRING,
+dns_qry_name STRING,
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT,
+dns_a STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT,
+h TINYINT
+)
+STORED AS PARQUET 
+LOCATION '${hiveconf:huser}/dns/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_dendro (
+unix_tstamp BIGINT,
+dns_a STRING,
+dns_qry_name STRING,
+ip_dst STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/dns/hive/oa/dendro';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_edge (
+unix_tstamp BIGINT,
+frame_len BIGINT,
+ip_dst STRING,
+ip_src STRING,
+dns_qry_name STRING,
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT,
+dns_a STRING,
+hh INT,
+dns_qry_class_name STRING,
+dns_qry_type_name STRING,
+dns_qry_rcode_name STRING,
+network_context STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/dns/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_ingest_summary (
+tdate STRING,
+total BIGINT
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/dns/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_scores (
+frame_time STRING, 
+unix_tstamp BIGINT,
+frame_len BIGINT,
+ip_dst STRING, 
+dns_qry_name STRING, 
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT, 
+ml_score FLOAT,
+tld STRING,
+query_rep STRING,
+hh INT,
+dns_qry_class_name STRING, 
+dns_qry_type_name STRING,
+dns_qry_rcode_name STRING, 
+network_context STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/dns/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_storyboard (
+ip_threat STRING,
+dns_threat STRING, 
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/dns/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_threat_dendro (
+anchor STRING, 
+total BIGINT,
+dns_qry_name STRING, 
+ip_dst STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/dns/hive/oa/threat_dendro';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.dns_threat_investigation (
+unix_tstamp BIGINT,
+ip_dst STRING, 
+dns_qry_name STRING, 
+ip_sev INT,
+dns_sev INT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/dns/hive/oa/threat_investigation';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/hive/create_flow_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/hive/create_flow_parquet.hql b/spot-setup/hive/create_flow_parquet.hql
new file mode 100755
index 0000000..034e194
--- /dev/null
+++ b/spot-setup/hive/create_flow_parquet.hql
@@ -0,0 +1,197 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SET hiveconf:huser;
+SET hiveconf:dbname;
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow (
+treceived STRING,
+unix_tstamp BIGINT,
+tryear INT,
+trmonth INT,
+trday INT,
+trhour INT,
+trminute INT,
+trsec INT,
+tdur FLOAT,
+sip STRING,
+dip STRING,
+sport INT,
+dport INT,
+proto STRING,
+flag STRING,
+fwd INT,
+stos INT,
+ipkt BIGINT,
+ibyt BIGINT,
+opkt BIGINT, 
+obyt BIGINT,
+input INT,
+output INT,
+sas INT,
+das INT,
+dtos INT,
+dir INT,
+rip STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT,
+h TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/flow/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_chords (
+ip_threat STRING,
+srcip STRING,
+dstip STRING,
+ibyt BIGINT, 
+ipkt BIGINT
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/flow/hive/oa/chords';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_edge (
+tstart STRING, 
+srcip STRING,
+dstip STRING,
+sport INT, 
+dport INT, 
+proto STRING,
+flags STRING,
+tos INT, 
+ibyt BIGINT, 
+ipkt BIGINT, 
+input BIGINT,
+output BIGINT, 
+rip STRING,
+obyt BIGINT, 
+opkt BIGINT, 
+hh INT,
+mn INT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/flow/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_ingest_summary (
+tdate STRING,
+total BIGINT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/flow/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_scores (
+tstart STRING, 
+srcip STRING,
+dstip STRING,
+sport INT, 
+dport INT, 
+proto STRING,
+ipkt INT,
+ibyt INT,
+opkt INT,
+obyt INT,
+ml_score FLOAT,
+rank INT,
+srcip_INTernal INT,
+dstip_INTernal INT,
+src_geoloc STRING, 
+dst_geoloc STRING, 
+src_domain STRING, 
+dst_domain STRING, 
+src_rep STRING,
+dst_rep STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/flow/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_storyboard (
+ip_threat STRING,
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/flow/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_threat_investigation (
+tstart STRING,
+srcip STRING, 
+dstip STRING, 
+srcport INT,
+dstport INT,
+score INT 
+) 
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+) 
+STORED AS PARQUET 
+LOCATION '${hiveconf:huser}/flow/hive/oa/threat_investigation';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.flow_timeline (
+ip_threat STRING,
+tstart STRING, 
+tend STRING, 
+srcip STRING,
+dstip STRING,
+proto STRING,
+sport INT, 
+dport INT, 
+ipkt BIGINT, 
+ibyt BIGINT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/flow/hive/oa/timeline';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/hive/create_proxy_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/hive/create_proxy_parquet.hql b/spot-setup/hive/create_proxy_parquet.hql
new file mode 100755
index 0000000..16d90c0
--- /dev/null
+++ b/spot-setup/hive/create_proxy_parquet.hql
@@ -0,0 +1,179 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SET hiveconf:huser;
+SET hiveconf:dbname;
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy (
+p_date STRING,
+p_time STRING,
+clientip STRING,
+host STRING,
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+duration INT,
+username STRING,
+authgroup STRING,
+exceptionid STRING,
+filterresult STRING,
+webcat STRING,
+referer STRING,
+respcode STRING,
+action STRING,
+urischeme STRING,
+uriport STRING,
+uripath STRING,
+uriquery STRING,
+uriextension STRING,
+serverip STRING,
+scbytes INT,
+csbytes INT,
+virusid STRING,
+bcappname STRING,
+bcappoper STRING,
+fulluri STRING
+)
+PARTITIONED BY (
+y STRING,
+m STRING,
+d STRING,
+h STRING
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/proxy/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_edge (
+tdate STRING,
+time STRING, 
+clientip STRING, 
+host STRING, 
+webcat STRING, 
+respcode STRING, 
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+referer STRING,
+uriport STRING,
+serverip STRING, 
+scbytes INT, 
+csbytes INT, 
+fulluri STRING,
+hh INT,
+respcode_name STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/proxy/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_ingest_summary (
+tdate STRING,
+total BIGINT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/proxy/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_scores (
+tdate STRING,
+time STRING, 
+clientip STRING, 
+host STRING, 
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+duration INT,
+username STRING, 
+webcat STRING, 
+referer STRING,
+respcode INT,
+uriport INT, 
+uripath STRING,
+uriquery STRING, 
+serverip STRING, 
+scbytes INT, 
+csbytes INT, 
+fulluri STRING,
+word STRING, 
+ml_score FLOAT,
+uri_rep STRING,
+respcode_name STRING,
+network_context STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/proxy/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_storyboard (
+p_threat STRING, 
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/proxy/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_threat_investigation (
+tdate STRING,
+fulluri STRING,
+uri_sev INT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/proxy/hive/oa/threat_investigation';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${hiveconf:dbname}.proxy_timeline (
+p_threat STRING, 
+tstart STRING, 
+tend STRING, 
+duration BIGINT, 
+clientip STRING, 
+respcode STRING, 
+respcodename STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${hiveconf:huser}/proxy/hive/oa/timeline';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/impala/create_dns_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/impala/create_dns_parquet.hql b/spot-setup/impala/create_dns_parquet.hql
new file mode 100755
index 0000000..274ea9d
--- /dev/null
+++ b/spot-setup/impala/create_dns_parquet.hql
@@ -0,0 +1,163 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns (
+frame_time STRING, 
+unix_tstamp BIGINT,
+frame_len INT,
+ip_dst STRING,
+ip_src STRING,
+dns_qry_name STRING,
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT,
+dns_a STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT,
+h TINYINT
+)
+STORED AS PARQUET 
+LOCATION '${var:huser}/dns/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_dendro (
+unix_tstamp BIGINT,
+dns_a STRING,
+dns_qry_name STRING,
+ip_dst STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/dns/hive/oa/dendro';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_edge (
+unix_tstamp BIGINT,
+frame_len BIGINT,
+ip_dst STRING,
+ip_src STRING,
+dns_qry_name STRING,
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT,
+dns_a STRING,
+hh INT,
+dns_qry_class_name STRING,
+dns_qry_type_name STRING,
+dns_qry_rcode_name STRING,
+network_context STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/dns/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_ingest_summary (
+tdate STRING,
+total BIGINT
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/dns/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_scores (
+frame_time STRING, 
+unix_tstamp BIGINT,
+frame_len BIGINT,
+ip_dst STRING, 
+dns_qry_name STRING, 
+dns_qry_class STRING,
+dns_qry_type INT,
+dns_qry_rcode INT, 
+ml_score FLOAT,
+tld STRING,
+query_rep STRING,
+hh INT,
+dns_qry_class_name STRING, 
+dns_qry_type_name STRING,
+dns_qry_rcode_name STRING, 
+network_context STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/dns/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_storyboard (
+ip_threat STRING,
+dns_threat STRING, 
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/dns/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_threat_dendro (
+anchor STRING, 
+total BIGINT,
+dns_qry_name STRING, 
+ip_dst STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/dns/hive/oa/threat_dendro';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.dns_threat_investigation (
+unix_tstamp BIGINT,
+ip_dst STRING, 
+dns_qry_name STRING, 
+ip_sev INT,
+dns_sev INT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/dns/hive/oa/threat_investigation';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/impala/create_flow_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/impala/create_flow_parquet.hql b/spot-setup/impala/create_flow_parquet.hql
new file mode 100755
index 0000000..c8d3481
--- /dev/null
+++ b/spot-setup/impala/create_flow_parquet.hql
@@ -0,0 +1,195 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow (
+treceived STRING,
+unix_tstamp BIGINT,
+tryear INT,
+trmonth INT,
+trday INT,
+trhour INT,
+trminute INT,
+trsec INT,
+tdur FLOAT,
+sip STRING,
+dip STRING,
+sport INT,
+dport INT,
+proto STRING,
+flag STRING,
+fwd INT,
+stos INT,
+ipkt BIGINT,
+ibyt BIGINT,
+opkt BIGINT, 
+obyt BIGINT,
+input INT,
+output INT,
+sas INT,
+das INT,
+dtos INT,
+dir INT,
+rip STRING
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT,
+h TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/flow/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_chords (
+ip_threat STRING,
+srcip STRING,
+dstip STRING,
+ibyt BIGINT, 
+ipkt BIGINT
+)
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/flow/hive/oa/chords';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_edge (
+tstart STRING, 
+srcip STRING,
+dstip STRING,
+sport INT, 
+dport INT, 
+proto STRING,
+flags STRING,
+tos INT, 
+ibyt BIGINT, 
+ipkt BIGINT, 
+input BIGINT,
+output BIGINT, 
+rip STRING,
+obyt BIGINT, 
+opkt BIGINT, 
+hh INT,
+mn INT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/flow/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_ingest_summary (
+tdate STRING,
+total BIGINT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/flow/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_scores (
+tstart STRING, 
+srcip STRING,
+dstip STRING,
+sport INT, 
+dport INT, 
+proto STRING,
+ipkt INT,
+ibyt INT,
+opkt INT,
+obyt INT,
+ml_score FLOAT,
+rank INT,
+srcip_INTernal INT,
+dstip_INTernal INT,
+src_geoloc STRING, 
+dst_geoloc STRING, 
+src_domain STRING, 
+dst_domain STRING, 
+src_rep STRING,
+dst_rep STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/flow/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_storyboard (
+ip_threat STRING,
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/flow/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_threat_investigation (
+tstart STRING,
+srcip STRING, 
+dstip STRING, 
+srcport INT,
+dstport INT,
+score INT 
+) 
+PARTITIONED BY (
+y SMALLINT,
+m TINYINT,
+d TINYINT
+) 
+STORED AS PARQUET 
+LOCATION '${var:huser}/flow/hive/oa/threat_investigation';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.flow_timeline (
+ip_threat STRING,
+tstart STRING, 
+tend STRING, 
+srcip STRING,
+dstip STRING,
+proto STRING,
+sport INT, 
+dport INT, 
+ipkt BIGINT, 
+ibyt BIGINT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/flow/hive/oa/timeline';

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/3383c07c/spot-setup/impala/create_proxy_parquet.hql
----------------------------------------------------------------------
diff --git a/spot-setup/impala/create_proxy_parquet.hql b/spot-setup/impala/create_proxy_parquet.hql
new file mode 100755
index 0000000..ddf3283
--- /dev/null
+++ b/spot-setup/impala/create_proxy_parquet.hql
@@ -0,0 +1,177 @@
+
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+
+--    http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy (
+p_date STRING,
+p_time STRING,
+clientip STRING,
+host STRING,
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+duration INT,
+username STRING,
+authgroup STRING,
+exceptionid STRING,
+filterresult STRING,
+webcat STRING,
+referer STRING,
+respcode STRING,
+action STRING,
+urischeme STRING,
+uriport STRING,
+uripath STRING,
+uriquery STRING,
+uriextension STRING,
+serverip STRING,
+scbytes INT,
+csbytes INT,
+virusid STRING,
+bcappname STRING,
+bcappoper STRING,
+fulluri STRING
+)
+PARTITIONED BY (
+y STRING,
+m STRING,
+d STRING,
+h STRING
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/proxy/hive';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_edge (
+tdate STRING,
+time STRING, 
+clientip STRING, 
+host STRING, 
+webcat STRING, 
+respcode STRING, 
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+referer STRING,
+uriport STRING,
+serverip STRING, 
+scbytes INT, 
+csbytes INT, 
+fulluri STRING,
+hh INT,
+respcode_name STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/proxy/hive/oa/edge';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_ingest_summary (
+tdate STRING,
+total BIGINT 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/proxy/hive/oa/summary';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_scores (
+tdate STRING,
+time STRING, 
+clientip STRING, 
+host STRING, 
+reqmethod STRING,
+useragent STRING,
+resconttype STRING,
+duration INT,
+username STRING, 
+webcat STRING, 
+referer STRING,
+respcode INT,
+uriport INT, 
+uripath STRING,
+uriquery STRING, 
+serverip STRING, 
+scbytes INT, 
+csbytes INT, 
+fulluri STRING,
+word STRING, 
+ml_score FLOAT,
+uri_rep STRING,
+respcode_name STRING,
+network_context STRING 
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/proxy/hive/oa/suspicious';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_storyboard (
+p_threat STRING, 
+title STRING,
+text STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/proxy/hive/oa/storyboard';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_threat_investigation (
+tdate STRING,
+fulluri STRING,
+uri_sev INT
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/proxy/hive/oa/threat_investigation';
+
+
+CREATE EXTERNAL TABLE IF NOT EXISTS ${var:dbname}.proxy_timeline (
+p_threat STRING, 
+tstart STRING, 
+tend STRING, 
+duration BIGINT, 
+clientip STRING, 
+respcode STRING, 
+respcodename STRING
+)
+PARTITIONED BY ( 
+y SMALLINT,
+m TINYINT,
+d TINYINT
+)
+STORED AS PARQUET
+LOCATION '${var:huser}/proxy/hive/oa/timeline';


[10/13] incubator-spot git commit: [SPOT-213][SPOT-223] attempt to fix kerberos authenticate issue with kerberos.py

Posted by na...@apache.org.
[SPOT-213][SPOT-223] attempt to fix kerberos authenticate issue with kerberos.py


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/d7b1d37e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/d7b1d37e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/d7b1d37e

Branch: refs/heads/master
Commit: d7b1d37efc8ea23c35745dbd2fc32d1d5a69854f
Parents: 1582c4c
Author: natedogs911 <na...@gmail.com>
Authored: Fri Jan 19 09:43:05 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Fri Jan 19 09:43:05 2018 -0800

----------------------------------------------------------------------
 spot-ingest/common/kerberos.py | 42 +++++++++++++++++++++----------------
 1 file changed, 24 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d7b1d37e/spot-ingest/common/kerberos.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/kerberos.py b/spot-ingest/common/kerberos.py
index 1cdca78..95baef9 100755
--- a/spot-ingest/common/kerberos.py
+++ b/spot-ingest/common/kerberos.py
@@ -17,31 +17,37 @@
 # limitations under the License.
 #
 
-import os
-import subprocess
 import sys
+import os
+import common.configurator as config
+from common.utils import Util
 
-class Kerberos(object):
 
+class Kerberos(object):
     def __init__(self):
 
-        self._kinit =  os.getenv('KINITPATH')
-        self._kinitopts =  os.getenv('KINITOPTS')
-        self._keytab =  os.getenv('KEYTABPATH')
-        self._krb_user =  os.getenv('KRB_USER')
+        self._logger = Util.get_logger('SPOT.COMMON.KERBEROS')
+        principal, keytab, sasl_mech, security_proto = config.kerberos()
+
+        if os.getenv('KINITPATH'):
+            self._kinit = os.getenv('KINITPATH')
+        else:
+            self._kinit = "kinit"
+
+        self._kinitopts = os.getenv('KINITOPTS')
+        self._keytab = "-kt {0}".format(keytab)
+        self._krb_user = principal
 
-        if self._kinit == None or self._kinitopts == None or self._keytab == None or self._krb_user == None:
-            print "Please verify kerberos configuration, some environment variables are missing."
+        if self._kinit == None or self._keytab == None or self._krb_user == None:
+            self._logger.error("Please verify kerberos configuration, some environment variables are missing.")
             sys.exit(1)
 
-        self._kinit_args = [self._kinit,self._kinitopts,self._keytab,self._krb_user]
+        if self._kinitopts is None:
+            self._kinit_cmd = "{0} {1} {2}".format(self._kinit, self._keytab, self._krb_user)
+        else:
+            self._kinit_cmd = "{0} {1} {2} {3}".format(self._kinit, self._kinitopts, self._keytab, self._krb_user)
 
-	def authenticate(self):
+    def authenticate(self):
 
-		kinit = subprocess.Popen(self._kinit_args, stderr = subprocess.PIPE)
-		output,error = kinit.communicate()
-		if not kinit.returncode == 0:
-			if error:
-				print error.rstrip()
-				sys.exit(kinit.returncode)
-		print "Successfully authenticated!"
+        Util.execute_cmd(self._kinit_cmd, self._logger)
+        self._logger.info("Kerberos ticket obtained")


[08/13] incubator-spot git commit: [SPOT-213][SPOT-77] updated requirements and documentation to support Kerberos for ingest

Posted by na...@apache.org.
[SPOT-213][SPOT-77] updated requirements and documentation to support Kerberos for ingest


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/13e35fc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/13e35fc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/13e35fc1

Branch: refs/heads/master
Commit: 13e35fc1fa0a48b6df882341ab3c8e1e98324203
Parents: 49f4934
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 15:24:23 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 15:24:23 2018 -0800

----------------------------------------------------------------------
 spot-ingest/KERBEROS.md               | 50 ++++++++++++++++++++++++++++++
 spot-ingest/README.md                 |  6 ++++
 spot-ingest/kerberos-requirements.txt |  4 +++
 spot-ingest/requirements.txt          |  5 ++-
 4 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/13e35fc1/spot-ingest/KERBEROS.md
----------------------------------------------------------------------
diff --git a/spot-ingest/KERBEROS.md b/spot-ingest/KERBEROS.md
new file mode 100644
index 0000000..2c4c034
--- /dev/null
+++ b/spot-ingest/KERBEROS.md
@@ -0,0 +1,50 @@
+## Kerberos support installation
+
+run the following in addition to the typical installation instructions
+
+### Spot-Ingest
+
+`pip install -r ./spot-ingest/kerberos-requirements.txt`
+
+### Spot-OA
+
+`pip install -r ./spot-ingest/kerberos-requirements.txt`
+
+
+## spot.conf
+
+KERBEROS       =  set `KERBEROS='true'` in /etc/spot.conf to enable kerberos
+KEYTAB         =  should be generated using `ktutil` or another approved method
+SASL_MECH      =  should be set to `sasl_plaintext` unless using ssl
+KAFKA_SERVICE  =  if not set defaults will be used
+
+SSL            =  enable ssl by setting to true
+SSL_VERIFY     =  by setting to `false` disables host checking **important** only recommended in non production environments
+CA_LOCATION    =  location of certificate authority file
+CERT           =  host certificate
+KEY            =  key required for host certificate
+
+sample below:
+
+```
+#kerberos config
+KERBEROS='true'
+KINIT=/usr/bin/kinit
+PRINCIPAL='spot'
+KEYTAB='/opt/security/spot.keytab'
+SASL_MECH='GSSAPI'
+SECURITY_PROTO='sasl_plaintext'
+KAFKA_SERVICE_NAME=''
+
+#ssl config
+SSL='false'
+SSL_VERIFY='true'
+CA_LOCATION=''
+CERT=''
+KEY=''
+
+```
+
+Please see [LIBRDKAFKA Configurations](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
+for reference to additional settings that can be set by modifying `spot-ingest/common/kafka_client.py`
+

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/13e35fc1/spot-ingest/README.md
----------------------------------------------------------------------
diff --git a/spot-ingest/README.md b/spot-ingest/README.md
index acfb382..ce4f4cc 100644
--- a/spot-ingest/README.md
+++ b/spot-ingest/README.md
@@ -20,6 +20,12 @@ Ingest data is captured or transferred into the Hadoop cluster, where they are t
 ### Install
 1. Install Python dependencies `pip install -r requirements.txt` 
 
+Optional:
+2. the sasl python package requires the following:
+   * Centos: `yum install cyrus-sasl-devel`
+   * Debian/Ubuntu: `apt-get install libsasl2-dev`
+3. install Python dependencies for Kerberos `pip install -r kerberos-requirements.txt`
+
 ### Configure Kafka
 **Adding Kafka Service:**
 

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/13e35fc1/spot-ingest/kerberos-requirements.txt
----------------------------------------------------------------------
diff --git a/spot-ingest/kerberos-requirements.txt b/spot-ingest/kerberos-requirements.txt
new file mode 100644
index 0000000..ae5ea26
--- /dev/null
+++ b/spot-ingest/kerberos-requirements.txt
@@ -0,0 +1,4 @@
+thrift_sasl==0.2.1
+sasl
+hdfs[kerberos]
+requests-kerberos
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/13e35fc1/spot-ingest/requirements.txt
----------------------------------------------------------------------
diff --git a/spot-ingest/requirements.txt b/spot-ingest/requirements.txt
index 7d04054..71661bc 100644
--- a/spot-ingest/requirements.txt
+++ b/spot-ingest/requirements.txt
@@ -1,2 +1,5 @@
 watchdog
-kafka-python
+confluent-kafka
+impyla
+hdfs
+six >= 1.5


[03/13] incubator-spot git commit: [SPOT-213][SPOT-250][OA][DATA] temp fix for impala calls, add TODO for impyla conversion

Posted by na...@apache.org.
[SPOT-213][SPOT-250][OA][DATA] temp fix for impala calls, add TODO for impyla conversion


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/0e749191
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/0e749191
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/0e749191

Branch: refs/heads/master
Commit: 0e749191311a3c1695cd40322c1b5788cc56e50c
Parents: d1f5a67
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 11:06:50 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 11:06:50 2018 -0800

----------------------------------------------------------------------
 spot-oa/oa/components/data/hive.py   |  1 +
 spot-oa/oa/components/data/impala.py | 25 +++++++++++++++++++++----
 2 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0e749191/spot-oa/oa/components/data/hive.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/components/data/hive.py b/spot-oa/oa/components/data/hive.py
index a7c1d4b..7d2eaa2 100644
--- a/spot-oa/oa/components/data/hive.py
+++ b/spot-oa/oa/components/data/hive.py
@@ -24,6 +24,7 @@ class Engine(object):
         self._pipeline = pipeline
 
     def query(self,query,output_file=None, delimiter=','):
+        # TODO: fix kerberos compatibility, use impyla
         hive_config = "set mapred.max.split.size=1073741824;set hive.exec.reducers.max=10;set hive.cli.print.header=true;"
         
         del_format = "| sed 's/[\t]/{0}/g'".format(delimiter)

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/0e749191/spot-oa/oa/components/data/impala.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/components/data/impala.py b/spot-oa/oa/components/data/impala.py
index bfc1c5a..10d1f5b 100644
--- a/spot-oa/oa/components/data/impala.py
+++ b/spot-oa/oa/components/data/impala.py
@@ -16,6 +16,8 @@
 #
 
 from subprocess import check_output
+from common import configurator
+
 
 class Engine(object):
 
@@ -24,17 +26,32 @@ class Engine(object):
         self._daemon_node = conf['impala_daemon']
         self._db = db
         self._pipeline = pipeline
-        impala_cmd = "impala-shell -i {0} --quiet -q 'INVALIDATE METADATA {1}.{2}'".format(self._daemon_node,self._db, self._pipeline)
+
+        if configurator.kerberos_enabled():
+            self._impala_shell = "impala-shell -k -i {0} --quiet".format(self._daemon_node)
+        else:
+            self._impala_shell = "impala-shell -i {0} --quiet".format(self._daemon_node)
+
+        impala_cmd = "{0} -q 'INVALIDATE METADATA {1}.{2}'".format(self._impala_shell, self._db, self._pipeline)
         check_output(impala_cmd,shell=True)
     
-        impala_cmd = "impala-shell -i {0} --quiet -q 'REFRESH {1}.{2}'".format(self._daemon_node,self._db, self._pipeline)
+        impala_cmd = "{0} -q 'REFRESH {1}.{2}'".format(self._impala_shell, self._db, self._pipeline)
         check_output(impala_cmd,shell=True)
 
     def query(self,query,output_file=None,delimiter=","):
 
         if output_file:
-            impala_cmd = "impala-shell -i {0} --quiet --print_header -B --output_delimiter='{1}' -q \"{2}\" -o {3}".format(self._daemon_node,delimiter,query,output_file)
+            impala_cmd = "{0} --print_header -B --output_delimiter='{1}' -q \"{2}\" -o {3}".format(
+                self._impala_shell,
+                delimiter,
+                query,
+                output_file
+            )
         else:
-            impala_cmd = "impala-shell -i {0} --quiet --print_header -B --output_delimiter='{1}' -q \"{2}\"".format(self._daemon_node,delimiter,query)
+            impala_cmd = "{0} --print_header -B --output_delimiter='{1}' -q \"{2}\"".format(
+                self._impala_shell,
+                delimiter,
+                query
+            )
 
         check_output(impala_cmd,shell=True)


[12/13] incubator-spot git commit: [SPOT-213] fix readme location and typo

Posted by na...@apache.org.
[SPOT-213] fix readme location and typo


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/f594956e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/f594956e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/f594956e

Branch: refs/heads/master
Commit: f594956e2b7fa3cd5a09fe2ad2fa5bc697cf347a
Parents: 41e51b8
Author: natedogs911 <na...@gmail.com>
Authored: Tue Jan 23 12:10:54 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Tue Jan 23 12:10:54 2018 -0800

----------------------------------------------------------------------
 spot-ingest/KERBEROS.md | 50 --------------------------------------------
 spot-setup/KERBEROS.md  | 50 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 50 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/f594956e/spot-ingest/KERBEROS.md
----------------------------------------------------------------------
diff --git a/spot-ingest/KERBEROS.md b/spot-ingest/KERBEROS.md
deleted file mode 100644
index 2c4c034..0000000
--- a/spot-ingest/KERBEROS.md
+++ /dev/null
@@ -1,50 +0,0 @@
-## Kerberos support installation
-
-run the following in addition to the typical installation instructions
-
-### Spot-Ingest
-
-`pip install -r ./spot-ingest/kerberos-requirements.txt`
-
-### Spot-OA
-
-`pip install -r ./spot-ingest/kerberos-requirements.txt`
-
-
-## spot.conf
-
-KERBEROS       =  set `KERBEROS='true'` in /etc/spot.conf to enable kerberos
-KEYTAB         =  should be generated using `ktutil` or another approved method
-SASL_MECH      =  should be set to `sasl_plaintext` unless using ssl
-KAFKA_SERVICE  =  if not set defaults will be used
-
-SSL            =  enable ssl by setting to true
-SSL_VERIFY     =  by setting to `false` disables host checking **important** only recommended in non production environments
-CA_LOCATION    =  location of certificate authority file
-CERT           =  host certificate
-KEY            =  key required for host certificate
-
-sample below:
-
-```
-#kerberos config
-KERBEROS='true'
-KINIT=/usr/bin/kinit
-PRINCIPAL='spot'
-KEYTAB='/opt/security/spot.keytab'
-SASL_MECH='GSSAPI'
-SECURITY_PROTO='sasl_plaintext'
-KAFKA_SERVICE_NAME=''
-
-#ssl config
-SSL='false'
-SSL_VERIFY='true'
-CA_LOCATION=''
-CERT=''
-KEY=''
-
-```
-
-Please see [LIBRDKAFKA Configurations](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
-for reference to additional settings that can be set by modifying `spot-ingest/common/kafka_client.py`
-

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/f594956e/spot-setup/KERBEROS.md
----------------------------------------------------------------------
diff --git a/spot-setup/KERBEROS.md b/spot-setup/KERBEROS.md
new file mode 100644
index 0000000..a980d1c
--- /dev/null
+++ b/spot-setup/KERBEROS.md
@@ -0,0 +1,50 @@
+## Kerberos support installation
+
+run the following in addition to the typical installation instructions
+
+### Spot-Ingest
+
+`pip install -r ./spot-ingest/kerberos-requirements.txt`
+
+### Spot-OA
+
+`pip install -r ./spot-oa/kerberos-requirements.txt`
+
+
+## spot.conf
+
+KERBEROS       =  set `KERBEROS='true'` in /etc/spot.conf to enable kerberos
+KEYTAB         =  should be generated using `ktutil` or another approved method
+SASL_MECH      =  should be set to `sasl_plaintext` unless using ssl
+KAFKA_SERVICE  =  if not set defaults will be used
+
+SSL            =  enable ssl by setting to true
+SSL_VERIFY     =  by setting to `false` disables host checking **important** only recommended in non production environments
+CA_LOCATION    =  location of certificate authority file
+CERT           =  host certificate
+KEY            =  key required for host certificate
+
+sample below:
+
+```
+#kerberos config
+KERBEROS='true'
+KINIT=/usr/bin/kinit
+PRINCIPAL='spot'
+KEYTAB='/opt/security/spot.keytab'
+SASL_MECH='GSSAPI'
+SECURITY_PROTO='sasl_plaintext'
+KAFKA_SERVICE_NAME=''
+
+#ssl config
+SSL='false'
+SSL_VERIFY='true'
+CA_LOCATION=''
+CERT=''
+KEY=''
+
+```
+
+Please see [LIBRDKAFKA Configurations](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
+for reference to additional settings that can be set by modifying `spot-ingest/common/kafka_client.py`
+


[04/13] incubator-spot git commit: [OA][PEP 8] reformat utils.py

Posted by na...@apache.org.
[OA][PEP 8] reformat utils.py


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/afeb0994
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/afeb0994
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/afeb0994

Branch: refs/heads/master
Commit: afeb0994fe8b46bb510a6947e367391d12930e9e
Parents: 0e74919
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 11:08:56 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 11:08:56 2018 -0800

----------------------------------------------------------------------
 spot-oa/oa/utils.py | 282 +++++++++++++++++++++++------------------------
 1 file changed, 136 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/afeb0994/spot-oa/oa/utils.py
----------------------------------------------------------------------
diff --git a/spot-oa/oa/utils.py b/spot-oa/oa/utils.py
index 2bed10e..8ac6555 100644
--- a/spot-oa/oa/utils.py
+++ b/spot-oa/oa/utils.py
@@ -22,115 +22,114 @@ import csv
 import sys
 import ConfigParser
 
+
 class Util(object):
-	
-	@classmethod
-	def get_logger(cls,logger_name,create_file=False):
-		
-
-		# create logger for prd_ci
-		log = logging.getLogger(logger_name)
-		log.setLevel(level=logging.INFO)
-		
-		# create formatter and add it to the handlers
-		formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-		
-		if create_file:
-				# create file handler for logger.
-				fh = logging.FileHandler('oa.log')
-				fh.setLevel(level=logging.DEBUG)
-				fh.setFormatter(formatter)
-		# reate console handler for logger.
-		ch = logging.StreamHandler()
-		ch.setLevel(level=logging.DEBUG)
-		ch.setFormatter(formatter)
-
-		# add handlers to logger.
-		if create_file:
-			log.addHandler(fh)
-
-		log.addHandler(ch)
-		return  log
-
-	@classmethod
-	def get_spot_conf(cls):
-		
-		conf_file = "/etc/spot.conf"
-		config = ConfigParser.ConfigParser()
-		config.readfp(SecHead(open(conf_file)))	
-
-		return config
-	
-	@classmethod
-	def create_oa_folders(cls,type,date):		
-
-		# create date and ingest summary folder structure if they don't' exist.
-		root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-		data_type_folder = "{0}/data/{1}/{2}"
-		if not os.path.isdir(data_type_folder.format(root_path,type,date)): os.makedirs(data_type_folder.format(root_path,type,date))
-		if not os.path.isdir(data_type_folder.format(root_path,type,"ingest_summary")): os.makedirs(data_type_folder.format(root_path,type,"ingest_summary"))
-
-		# create ipynb folders.
-		ipynb_folder = "{0}/ipynb/{1}/{2}".format(root_path,type,date)
-		if not os.path.isdir(ipynb_folder): os.makedirs(ipynb_folder)
-
-		# retun path to folders.
-		data_path = data_type_folder.format(root_path,type,date)
-		ingest_path = data_type_folder.format(root_path,type,"ingest_summary")		
-		return data_path,ingest_path,ipynb_folder
-	
-	@classmethod
-	def get_ml_results_form_hdfs(cls,hdfs_file_path,local_path):
-
-		# get results from hdfs.
-		get_results_cmd = "hadoop fs -get {0} {1}/.".format(hdfs_file_path,local_path)
-		subprocess.call(get_results_cmd,shell=True)
-		return get_results_cmd
-
-	@classmethod
-	def read_results(cls,file,limit, delimiter=','):
-		
-		# read csv results.
-		result_rows = []
-		with open(file, 'rb') as results_file:
-			csv_reader = csv.reader(results_file, delimiter = delimiter)
-			for i in range(0, int(limit)):
-				try:
-					row = csv_reader.next()
-				except StopIteration:
-					return result_rows
-				result_rows.append(row)
-		return result_rows
-
-	@classmethod
-	def ip_to_int(self,ip):
-		
-		try:
-			o = map(int, ip.split('.'))
-			res = (16777216 * o[0]) + (65536 * o[1]) + (256 * o[2]) + o[3]
-			return res    
-
-		except ValueError:
-			return None
-	
-	
-	@classmethod
-	def create_csv_file(cls,full_path_file,content,delimiter=','):   
-		with open(full_path_file, 'w+') as u_file:
-			writer = csv.writer(u_file, quoting=csv.QUOTE_NONE, delimiter=delimiter)
-			writer.writerows(content)
-
-
-	@classmethod
-    	def cast_val(self,value):
-       	    try: 
-            	val = int(value) 
+    @classmethod
+    def get_logger(cls, logger_name, create_file=False):
+
+        # create logger for prd_ci
+        log = logging.getLogger(logger_name)
+        log.setLevel(level=logging.INFO)
+
+        # create formatter and add it to the handlers
+        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+        if create_file:
+            # create file handler for logger.
+            fh = logging.FileHandler('oa.log')
+            fh.setLevel(level=logging.DEBUG)
+            fh.setFormatter(formatter)
+        # reate console handler for logger.
+        ch = logging.StreamHandler()
+        ch.setLevel(level=logging.DEBUG)
+        ch.setFormatter(formatter)
+
+        # add handlers to logger.
+        if create_file:
+            log.addHandler(fh)
+
+        log.addHandler(ch)
+        return log
+
+    @classmethod
+    def get_spot_conf(cls):
+
+        conf_file = "/etc/spot.conf"
+        config = ConfigParser.ConfigParser()
+        config.readfp(SecHead(open(conf_file)))
+
+        return config
+
+    @classmethod
+    def create_oa_folders(cls, type, date):
+
+        # create date and ingest summary folder structure if they don't' exist.
+        root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+        data_type_folder = "{0}/data/{1}/{2}"
+        if not os.path.isdir(data_type_folder.format(root_path, type, date)): os.makedirs(
+            data_type_folder.format(root_path, type, date))
+        if not os.path.isdir(data_type_folder.format(root_path, type, "ingest_summary")): os.makedirs(
+            data_type_folder.format(root_path, type, "ingest_summary"))
+
+        # create ipynb folders.
+        ipynb_folder = "{0}/ipynb/{1}/{2}".format(root_path, type, date)
+        if not os.path.isdir(ipynb_folder): os.makedirs(ipynb_folder)
+
+        # retun path to folders.
+        data_path = data_type_folder.format(root_path, type, date)
+        ingest_path = data_type_folder.format(root_path, type, "ingest_summary")
+        return data_path, ingest_path, ipynb_folder
+
+    @classmethod
+    def get_ml_results_form_hdfs(cls, hdfs_file_path, local_path):
+
+        # get results from hdfs.
+        get_results_cmd = "hadoop fs -get {0} {1}/.".format(hdfs_file_path, local_path)
+        subprocess.call(get_results_cmd, shell=True)
+        return get_results_cmd
+
+    @classmethod
+    def read_results(cls, file, limit, delimiter=','):
+
+        # read csv results.
+        result_rows = []
+        with open(file, 'rb') as results_file:
+            csv_reader = csv.reader(results_file, delimiter=delimiter)
+            for i in range(0, int(limit)):
+                try:
+                    row = csv_reader.next()
+                except StopIteration:
+                    return result_rows
+                result_rows.append(row)
+        return result_rows
+
+    @classmethod
+    def ip_to_int(self, ip):
+
+        try:
+            o = map(int, ip.split('.'))
+            res = (16777216 * o[0]) + (65536 * o[1]) + (256 * o[2]) + o[3]
+            return res
+
+        except ValueError:
+            return None
+
+    @classmethod
+    def create_csv_file(cls, full_path_file, content, delimiter=','):
+        with open(full_path_file, 'w+') as u_file:
+            writer = csv.writer(u_file, quoting=csv.QUOTE_NONE, delimiter=delimiter)
+            writer.writerows(content)
+
+    @classmethod
+    def cast_val(self, value):
+        try:
+            val = int(value)
+        except:
+            try:
+                val = float(value)
             except:
-            	try:
-                    val = float(value) 
-            	except:
-                    val = str(value) 
-            return val    
+                val = str(value)
+        return val
 
 
 class SecHead(object):
@@ -140,47 +139,38 @@ class SecHead(object):
 
     def readline(self):
         if self.sechead:
-            try: 
+            try:
                 return self.sechead
-            finally: 
+            finally:
                 self.sechead = None
-        else: 
+        else:
             return self.fp.readline()
 
-class ProgressBar(object):
-
-	def __init__(self,total,prefix='',sufix='',decimals=2,barlength=60):
-
-		self._total = total
-		self._prefix = prefix
-		self._sufix = sufix
-		self._decimals = decimals
-		self._bar_length = barlength
-		self._auto_iteration_status = 0
-
-	def start(self):
-
-		self._move_progress_bar(0)
-	
-	def update(self,iterator):
-		
-		self._move_progress_bar(iterator)
-
-	def auto_update(self):
-
-		self._auto_iteration_status += 1		
-		self._move_progress_bar(self._auto_iteration_status)
-	
-	def _move_progress_bar(self,iteration):
-
-		filledLength    = int(round(self._bar_length * iteration / float(self._total)))
-		percents        = round(100.00 * (iteration / float(self._total)), self._decimals)
-		bar             = '#' * filledLength + '-' * (self._bar_length - filledLength)	
-		sys.stdout.write("{0} [{1}] {2}% {3}\r".format(self._prefix, bar, percents, self._sufix))		
-		sys.stdout.flush()
-		
-		if iteration == self._total:print("\n")
-
-		
-	
 
+class ProgressBar(object):
+    def __init__(self, total, prefix='', sufix='', decimals=2, barlength=60):
+        self._total = total
+        self._prefix = prefix
+        self._sufix = sufix
+        self._decimals = decimals
+        self._bar_length = barlength
+        self._auto_iteration_status = 0
+
+    def start(self):
+        self._move_progress_bar(0)
+
+    def update(self, iterator):
+        self._move_progress_bar(iterator)
+
+    def auto_update(self):
+        self._auto_iteration_status += 1
+        self._move_progress_bar(self._auto_iteration_status)
+
+    def _move_progress_bar(self, iteration):
+        filledLength = int(round(self._bar_length * iteration / float(self._total)))
+        percents = round(100.00 * (iteration / float(self._total)), self._decimals)
+        bar = '#' * filledLength + '-' * (self._bar_length - filledLength)
+        sys.stdout.write("{0} [{1}] {2}% {3}\r".format(self._prefix, bar, percents, self._sufix))
+        sys.stdout.flush()
+
+        if iteration == self._total: print("\n")


[05/13] incubator-spot git commit: [SPOT-213][SPOT-250][OA] update requirements for kerberos changes

Posted by na...@apache.org.
[SPOT-213][SPOT-250][OA] update requirements for kerberos changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/8b600c8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/8b600c8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/8b600c8f

Branch: refs/heads/master
Commit: 8b600c8fcc2818223904a2d37f3177ccceb88811
Parents: afeb099
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 11:10:40 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 11:10:40 2018 -0800

----------------------------------------------------------------------
 spot-oa/requirements.txt | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/8b600c8f/spot-oa/requirements.txt
----------------------------------------------------------------------
diff --git a/spot-oa/requirements.txt b/spot-oa/requirements.txt
index 1faa1b6..2596e64 100644
--- a/spot-oa/requirements.txt
+++ b/spot-oa/requirements.txt
@@ -24,3 +24,5 @@ setuptools>=3.4.4
 thrift==0.9.3
 impyla
 hdfs
+requests
+


[02/13] incubator-spot git commit: [SPOT-213][SPOT-250][OA][API] add kerberos support

Posted by na...@apache.org.
[SPOT-213][SPOT-250][OA][API] add kerberos support


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/d1f5a67f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/d1f5a67f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/d1f5a67f

Branch: refs/heads/master
Commit: d1f5a67f929090e2bad865d53d4389c69b176fc5
Parents: 7376c5e
Author: natedogs911 <na...@gmail.com>
Authored: Thu Jan 18 11:02:39 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Thu Jan 18 11:02:39 2018 -0800

----------------------------------------------------------------------
 spot-oa/api/resources/configurator.py  |  69 +++++++++-
 spot-oa/api/resources/hdfs_client.py   | 201 ++++++++++++++++++++++++----
 spot-oa/api/resources/impala_engine.py |  29 +++-
 3 files changed, 262 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/configurator.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/configurator.py b/spot-oa/api/resources/configurator.py
index 5bda045..017732d 100644
--- a/spot-oa/api/resources/configurator.py
+++ b/spot-oa/api/resources/configurator.py
@@ -14,35 +14,90 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import ConfigParser
-import os
+from io import open
+
 
 def configuration():
 
-    conf_file = "/etc/spot.conf"
     config = ConfigParser.ConfigParser()
-    config.readfp(SecHead(open(conf_file)))
+
+    try:
+        conf = open("/etc/spot.conf", "r")
+    except (OSError, IOError) as e:
+        print("Error opening: spot.conf" + " error: " + e.errno)
+        raise e
+
+    config.readfp(SecHead(conf))
     return config
 
+
 def db():
     conf = configuration()
-    return conf.get('conf', 'DBNAME').replace("'","").replace('"','')
+    return conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
+
 
 def impala():
     conf = configuration()
-    return conf.get('conf', 'IMPALA_DEM'),conf.get('conf', 'IMPALA_PORT')
+    return conf.get('conf', 'IMPALA_DEM'), conf.get('conf', 'IMPALA_PORT')
+
 
 def hdfs():
     conf = configuration()
     name_node = conf.get('conf',"NAME_NODE")
     web_port = conf.get('conf',"WEB_PORT")
     hdfs_user = conf.get('conf',"HUSER")
-    hdfs_user = hdfs_user.split("/")[-1].replace("'","").replace('"','')
+    hdfs_user = hdfs_user.split("/")[-1].replace("'", "").replace('"', '')
     return name_node,web_port,hdfs_user
 
+
 def spot():
     conf = configuration()
-    return conf.get('conf',"HUSER").replace("'","").replace('"','')
+    return conf.get('conf',"HUSER").replace("'", "").replace('"', '')
+
+
+def kerberos_enabled():
+    conf = configuration()
+    enabled = conf.get('conf', 'KERBEROS').replace("'", "").replace('"', '')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def kerberos():
+    conf = configuration()
+    if kerberos_enabled():
+        principal = conf.get('conf', 'PRINCIPAL')
+        keytab = conf.get('conf', 'KEYTAB')
+        sasl_mech = conf.get('conf', 'SASL_MECH')
+        security_proto = conf.get('conf', 'SECURITY_PROTO')
+        return principal, keytab, sasl_mech, security_proto
+    else:
+        raise KeyError
+
+
+def ssl_enabled():
+    conf = configuration()
+    enabled = conf.get('conf', 'SSL')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def ssl():
+    conf = configuration()
+    if ssl_enabled():
+        ssl_verify = conf.get('conf', 'SSL_VERIFY')
+        ca_location = conf.get('conf', 'CA_LOCATION')
+        cert = conf.get('conf', 'CERT')
+        key = conf.get('conf', 'KEY')
+        return ssl_verify, ca_location, cert, key
+    else:
+        raise KeyError
+
 
 class SecHead(object):
     def __init__(self, fp):

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/hdfs_client.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/hdfs_client.py b/spot-oa/api/resources/hdfs_client.py
index 31c5eba..e7f6bec 100644
--- a/spot-oa/api/resources/hdfs_client.py
+++ b/spot-oa/api/resources/hdfs_client.py
@@ -14,63 +14,216 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from hdfs import InsecureClient
+
 from hdfs.util import HdfsError
+from hdfs import Client
+from hdfs.ext.kerberos import KerberosClient
+from requests import Session
 from json import dump
-import api.resources.configurator as Config
+from threading import Lock
+import logging
+import configurator as Config
+from sys import stderr
+
+
+class Progress(object):
+
+    """Basic progress tracker callback."""
+
+    def __init__(self, hdfs_path, nbytes):
+        self._data = {}
+        self._lock = Lock()
+        self._hpath = hdfs_path
+        self._nbytes = nbytes
+
+    def __call__(self):
+        with self._lock:
+            if self._nbytes >= 0:
+                self._data[self._hpath] = self._nbytes
+            else:
+                stderr.write('%s\n' % (sum(self._data.values()), ))
+
+
+class SecureKerberosClient(KerberosClient):
+
+    """A new client subclass for handling HTTPS connections with Kerberos.
+
+    :param url: URL to namenode.
+    :param cert: Local certificate. See `requests` documentation for details
+      on how to use this.
+    :param verify: Whether to check the host's certificate. WARNING: non production use only
+    :param \*\*kwargs: Keyword arguments passed to the default `Client`
+      constructor.
+
+    """
+
+    def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
+
+        self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
+        session = Session()
+
+        if verify == 'true':
+            self._logger.info('SSL verification enabled')
+            session.verify = True
+            if cert is not None:
+                self._logger.info('SSL Cert: ' + cert)
+                if ',' in cert:
+                    session.cert = [path.strip() for path in cert.split(',')]
+                else:
+                    session.cert = cert
+        elif verify == 'false':
+            session.verify = False
+
+        super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
+
 
+class HdfsException(HdfsError):
+    def __init__(self, message):
+        super(HdfsException, self).__init__(message)
+        self.message = message
+
+
+def get_client(user=None):
+    # type: (object) -> Client
+
+    logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
+    hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
+    conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port)}
+
+    if Config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = Config.ssl()
+        conf.update({'verify': ssl_verify.lower()})
+        if cert:
+            conf.update({'cert': cert})
+
+    if Config.kerberos_enabled():
+        krb_conf = {'mutual_auth': 'OPTIONAL'}
+        conf.update(krb_conf)
+
+    # TODO: possible user parameter
+    logger.info('Client conf:')
+    for k,v in conf.iteritems():
+        logger.info(k + ': ' + v)
+
+    client = SecureKerberosClient(**conf)
 
-def _get_client(user=None):
-    hdfs_nm,hdfs_port,hdfs_user = Config.hdfs()
-    client = InsecureClient('http://{0}:{1}'.format(hdfs_nm,hdfs_port), user= user if user else hdfs_user)
     return client
 
-def get_file(hdfs_file):
-    client = _get_client()
+
+def get_file(hdfs_file, client=None):
+    if not client:
+        client = get_client()
+
     with client.read(hdfs_file) as reader:
         results = reader.read()
         return results
 
-def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False):
-    
+
+def upload_file(hdfs_fp, local_fp, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress)
+        return result
+    except HdfsError as err:
+        return err
+
+
+def download_file(hdfs_path, local_path, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.download(hdfs_path, local_path, overwrite=overwrite)
+        return True
+    except HdfsError:
+        return False
+
+
+def mkdir(hdfs_path, client=None):
+    if client is not None:
+        client = get_client()
+
+    try:
+        client.makedirs(hdfs_path)
+        return True
+    except HdfsError:
+        return False
+
+
+def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
         hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
         with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer:
             for item in hdfs_file_content:
                 data = ','.join(str(d) for d in item)
                 writer.write("{0}\n".format(data))
         return True
-        
+
     except HdfsError:
         return False
 
-def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False):
-    
+
+def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
         hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
         with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer:
-	        dump(hdfs_file_content, writer)
+            dump(hdfs_file_content, writer)
         return True
     except HdfsError:
         return False
-    
 
-def delete_folder(hdfs_file,user=None):
-    client = _get_client(user)
-    client.delete(hdfs_file,recursive=True)
 
-def list_dir(hdfs_path):
+def delete_folder(hdfs_file, user=None, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.delete(hdfs_file,recursive=True)
+    except HdfsError:
+        return False
+
+
+def check_dir(hdfs_path, client=None):
+    """
+    Returns True if directory exists
+    Returns False if directory does not exist
+    : param hdfs_path: path to check
+    : object client: hdfs client object for persistent connection
+    """
+    if not client:
+        client = get_client()
+
+    result = client.list(hdfs_path)
+    if None not in result:
+        return True
+    else:
+        return False
+
+
+def list_dir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
     try:
-        client = _get_client()
         return client.list(hdfs_path)
     except HdfsError:
         return {}
 
-def file_exists(hdfs_path,file_name):
-    files = list_dir(hdfs_path)
+
+def file_exists(hdfs_path, file_name, client=None):
+    if not client:
+        client = get_client()
+
+    files = list_dir(client, hdfs_path)
     if str(file_name) in files:
-	    return True
+        return True
     else:
         return False

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/impala_engine.py
----------------------------------------------------------------------
diff --git a/spot-oa/api/resources/impala_engine.py b/spot-oa/api/resources/impala_engine.py
index b7d0148..542bbd0 100644
--- a/spot-oa/api/resources/impala_engine.py
+++ b/spot-oa/api/resources/impala_engine.py
@@ -15,15 +15,33 @@
 # limitations under the License.
 #
 from impala.dbapi import connect
-import api.resources.configurator as Config
+import api.resources.configurator as config
+
 
 def create_connection():
 
-    impala_host, impala_port =  Config.impala()
-    db = Config.db()
-    conn = connect(host=impala_host, port=int(impala_port),database=db)
+    impala_host, impala_port = config.impala()
+    conf = {}
+
+    # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default
+    service_name = {'kerberos_service_name': 'impala'}
+
+    if config.kerberos_enabled():
+        principal, keytab, sasl_mech, security_proto = config.kerberos()
+        conf.update({'auth_mechanism': 'GSSAPI',
+                     })
+
+    if config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = config.ssl()
+        conf.update({'ca_cert': cert,
+                     'use_ssl': ssl_verify
+                     })
+
+    db = config.db()
+    conn = connect(host=impala_host, port=int(impala_port), database=db, **conf)
     return conn.cursor()
 
+
 def execute_query(query,fetch=False):
 
     impala_cursor = create_connection()
@@ -31,6 +49,7 @@ def execute_query(query,fetch=False):
 
     return impala_cursor if not fetch else impala_cursor.fetchall()
 
+
 def execute_query_as_list(query):
 
     query_results = execute_query(query)
@@ -46,5 +65,3 @@ def execute_query_as_list(query):
         row_result = {}
 
     return results
-
-


[11/13] incubator-spot git commit: [SPOT-213][SPOT-77][SPOT-221] Update for spot-ingest to support Kerberos, implements hive client and Librdkafka support

Posted by na...@apache.org.
[SPOT-213][SPOT-77][SPOT-221] Update for spot-ingest to support Kerberos, implements hive client and Librdkafka support


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/41e51b8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/41e51b8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/41e51b8f

Branch: refs/heads/master
Commit: 41e51b8fab0ba7ebccba10e8e3052c7131cb43dc
Parents: d7b1d37
Author: natedogs911 <na...@gmail.com>
Authored: Tue Jan 23 11:49:40 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Tue Jan 23 11:49:40 2018 -0800

----------------------------------------------------------------------
 spot-ingest/common/kafka_client.py       | 193 ++++++++++++++++++++------
 spot-ingest/master_collector.py          |  21 +--
 spot-ingest/pipelines/dns/collector.py   | 133 +++++++++++-------
 spot-ingest/pipelines/dns/worker.py      | 141 ++++++++++++++-----
 spot-ingest/pipelines/flow/collector.py  | 111 +++++++++------
 spot-ingest/pipelines/flow/worker.py     | 193 ++++++++++++++++++++------
 spot-ingest/pipelines/proxy/collector.py |   6 +-
 spot-ingest/worker.py                    |   6 +-
 8 files changed, 588 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/common/kafka_client.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/kafka_client.py b/spot-ingest/common/kafka_client.py
index 977cb92..15441b2 100755
--- a/spot-ingest/common/kafka_client.py
+++ b/spot-ingest/common/kafka_client.py
@@ -19,23 +19,23 @@
 
 import logging
 import os
+import sys
 from common.utils import Util
-from kafka import KafkaProducer
-from kafka import KafkaConsumer as KC
-from kafka.partitioner.roundrobin import RoundRobinPartitioner
-from kafka.common import TopicPartition
+from confluent_kafka import Producer
+from confluent_kafka import Consumer
+import common.configurator as config
 
-class KafkaTopic(object):
 
+class KafkaProducer(object):
 
-    def __init__(self,topic,server,port,zk_server,zk_port,partitions):
+    def __init__(self, topic, server, port, zk_server, zk_port, partitions):
 
-        self._initialize_members(topic,server,port,zk_server,zk_port,partitions)
+        self._initialize_members(topic, server, port, zk_server, zk_port, partitions)
 
-    def _initialize_members(self,topic,server,port,zk_server,zk_port,partitions):
+    def _initialize_members(self, topic, server, port, zk_server, zk_port, partitions):
 
         # get logger isinstance
-        self._logger = logging.getLogger("SPOT.INGEST.KAFKA")
+        self._logger = logging.getLogger("SPOT.INGEST.KafkaProducer")
 
         # kafka requirements
         self._server = server
@@ -46,42 +46,93 @@ class KafkaTopic(object):
         self._num_of_partitions = partitions
         self._partitions = []
         self._partitioner = None
+        self._kafka_brokers = '{0}:{1}'.format(self._server, self._port)
 
         # create topic with partitions
         self._create_topic()
 
-    def _create_topic(self):
-
-        self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic,self._num_of_partitions))     
+        self._kafka_conf = self._producer_config(self._kafka_brokers)
+
+        self._p = Producer(**self._kafka_conf)
+
+    def _producer_config(self, server):
+        # type: (str) -> dict
+        """Returns a configuration dictionary containing optional values"""
+
+        connection_conf = {
+            'bootstrap.servers': server,
+        }
+
+        if os.environ.get('KAFKA_DEBUG'):
+            connection_conf.update({'debug': 'all'})
+
+        if config.kerberos_enabled():
+            self._logger.info('Kerberos enabled')
+            principal, keytab, sasl_mech, security_proto = config.kerberos()
+            connection_conf.update({
+                'sasl.mechanisms': sasl_mech,
+                'security.protocol': security_proto,
+                'sasl.kerberos.principal': principal,
+                'sasl.kerberos.keytab': keytab,
+                'sasl.kerberos.min.time.before.relogin': 6000
+            })
+
+            sn = os.environ.get('KAFKA_SERVICE_NAME')
+            if sn:
+                self._logger.info('Setting Kerberos service name: ' + sn)
+                connection_conf.update({'sasl.kerberos.service.name': sn})
+
+            kinit_cmd = os.environ.get('KAFKA_KINIT')
+            if kinit_cmd:
+                self._logger.info('using kinit command: ' + kinit_cmd)
+                connection_conf.update({'sasl.kerberos.kinit.cmd': kinit_cmd})
+            else:
+                # Using -S %{sasl.kerberos.service.name}/%{broker.name} causes the ticket cache to refresh
+                # resulting in authentication errors for other services
+                connection_conf.update({
+                    'sasl.kerberos.kinit.cmd': 'kinit -S "%{sasl.kerberos.service.name}/%{broker.name}" -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}'
+                })
+
+        if config.ssl_enabled():
+            self._logger.info('Using SSL connection settings')
+            ssl_verify, ca_location, cert, key = config.ssl()
+            connection_conf.update({
+                'ssl.certificate.location': cert,
+                'ssl.ca.location': ca_location,
+                'ssl.key.location': key
+            })
+
+        return connection_conf
 
-        # Create partitions for the workers.
-        self._partitions = [ TopicPartition(self._topic,p) for p in range(int(self._num_of_partitions))]        
+    def _create_topic(self):
 
-        # create partitioner
-        self._partitioner = RoundRobinPartitioner(self._partitions)
+        self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic, self._num_of_partitions))
         
         # get script path 
-        zk_conf = "{0}:{1}".format(self._zk_server,self._zk_port)
-        create_topic_cmd = "{0}/kafka_topic.sh create {1} {2} {3}".format(os.path.dirname(os.path.abspath(__file__)),self._topic,zk_conf,self._num_of_partitions)
+        zk_conf = "{0}:{1}".format(self._zk_server, self._zk_port)
+        create_topic_cmd = "{0}/kafka_topic.sh create {1} {2} {3}".format(
+            os.path.dirname(os.path.abspath(__file__)),
+            self._topic,
+            zk_conf,
+            self._num_of_partitions
+        )
 
         # execute create topic cmd
-        Util.execute_cmd(create_topic_cmd,self._logger)
+        Util.execute_cmd(create_topic_cmd, self._logger)
 
-    def send_message(self,message,topic_partition):
+    def SendMessage(self, message, topic):
+        p = self._p
+        p.produce(topic, message.encode('utf-8'), callback=self._delivery_callback)
+        p.poll(0)
+        p.flush(timeout=3600000)
 
-        self._logger.info("Sending message to: Topic: {0} Partition:{1}".format(self._topic,topic_partition))
-        kafka_brokers = '{0}:{1}'.format(self._server,self._port)             
-        producer = KafkaProducer(bootstrap_servers=[kafka_brokers],api_version_auto_timeout_ms=3600000)
-        future = producer.send(self._topic,message,partition=topic_partition)
-        producer.flush(timeout=3600000)
-        producer.close()
-    
     @classmethod
-    def SendMessage(cls,message,kafka_servers,topic,partition=0):
-        producer = KafkaProducer(bootstrap_servers=kafka_servers,api_version_auto_timeout_ms=3600000)
-        future = producer.send(topic,message,partition=partition)
-        producer.flush(timeout=3600000)
-        producer.close()  
+    def _delivery_callback(cls, err, msg):
+        if err:
+            sys.stderr.write('%% Message failed delivery: %s\n' % err)
+        else:
+            sys.stderr.write('%% Message delivered to %s [%d]\n' %
+                             (msg.topic(), msg.partition()))
 
     @property
     def Topic(self):
@@ -93,22 +144,24 @@ class KafkaTopic(object):
 
     @property
     def Zookeeper(self):
-        zk = "{0}:{1}".format(self._zk_server,self._zk_port)
+        zk = "{0}:{1}".format(self._zk_server, self._zk_port)
         return zk
 
     @property
     def BootstrapServers(self):
-        servers = "{0}:{1}".format(self._server,self._port) 
+        servers = "{0}:{1}".format(self._server, self._port)
         return servers
 
 
 class KafkaConsumer(object):
     
-    def __init__(self,topic,server,port,zk_server,zk_port,partition):
+    def __init__(self, topic, server, port, zk_server, zk_port, partition):
+
+        self._initialize_members(topic, server, port, zk_server, zk_port, partition)
 
-        self._initialize_members(topic,server,port,zk_server,zk_port,partition)
+    def _initialize_members(self, topic, server, port, zk_server, zk_port, partition):
 
-    def _initialize_members(self,topic,server,port,zk_server,zk_port,partition):
+        self._logger = logging.getLogger("SPOT.INGEST.KafkaConsumer")
 
         self._topic = topic
         self._server = server
@@ -116,14 +169,64 @@ class KafkaConsumer(object):
         self._zk_server = zk_server
         self._zk_port = zk_port
         self._id = partition
+        self._kafka_brokers = '{0}:{1}'.format(self._server, self._port)
+        self._kafka_conf = self._consumer_config(self._id, self._kafka_brokers)
+
+    def _consumer_config(self, groupid, server):
+        # type: (dict) -> dict
+        """Returns a configuration dictionary containing optional values"""
+
+        connection_conf = {
+            'bootstrap.servers': server,
+            'group.id': groupid,
+        }
+
+        if config.kerberos_enabled():
+            self._logger.info('Kerberos enabled')
+            principal, keytab, sasl_mech, security_proto = config.kerberos()
+            connection_conf.update({
+                'sasl.mechanisms': sasl_mech,
+                'security.protocol': security_proto,
+                'sasl.kerberos.principal': principal,
+                'sasl.kerberos.keytab': keytab,
+                'sasl.kerberos.min.time.before.relogin': 6000,
+                'default.topic.config': {
+                    'auto.commit.enable': 'true',
+                    'auto.commit.interval.ms': '60000',
+                    'auto.offset.reset': 'smallest'}
+            })
+
+            sn = os.environ.get('KAFKA_SERVICE_NAME')
+            if sn:
+                self._logger.info('Setting Kerberos service name: ' + sn)
+                connection_conf.update({'sasl.kerberos.service.name': sn})
+
+            kinit_cmd = os.environ.get('KAFKA_KINIT')
+            if kinit_cmd:
+                self._logger.info('using kinit command: ' + kinit_cmd)
+                connection_conf.update({'sasl.kerberos.kinit.cmd': kinit_cmd})
+            else:
+                # Using -S %{sasl.kerberos.service.name}/%{broker.name} causes the ticket cache to refresh
+                # resulting in authentication errors for other services
+                connection_conf.update({
+                    'sasl.kerberos.kinit.cmd': 'kinit -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}'
+                })
+
+        if config.ssl_enabled():
+            self._logger.info('Using SSL connection settings')
+            ssl_verify, ca_location, cert, key = config.ssl()
+            connection_conf.update({
+                'ssl.certificate.location': cert,
+                'ssl.ca.location': ca_location,
+                'ssl.key.location': key
+            })
+
+        return connection_conf
 
     def start(self):
-        
-        kafka_brokers = '{0}:{1}'.format(self._server,self._port)
-        consumer =  KC(bootstrap_servers=[kafka_brokers],group_id=self._topic)
-        partition = [TopicPartition(self._topic,int(self._id))]
-        consumer.assign(partitions=partition)
-        consumer.poll()
+
+        consumer = Consumer(**self._kafka_conf)
+        consumer.subscribe([self._topic])
         return consumer
 
     @property
@@ -132,6 +235,4 @@ class KafkaConsumer(object):
 
     @property
     def ZookeperServer(self):
-        return "{0}:{1}".format(self._zk_server,self._zk_port)
-
-    
+        return "{0}:{1}".format(self._zk_server, self._zk_port)

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/master_collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/master_collector.py b/spot-ingest/master_collector.py
index 6f6ff7c..23be9f4 100755
--- a/spot-ingest/master_collector.py
+++ b/spot-ingest/master_collector.py
@@ -24,14 +24,15 @@ import sys
 import datetime
 from common.utils import Util
 from common.kerberos import Kerberos
-from common.kafka_client import KafkaTopic
-
+import common.configurator as Config
+from common.kafka_client import KafkaProducer
 
 # get master configuration.
 SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
 CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
 MASTER_CONF = json.loads(open(CONF_FILE).read())
 
+
 def main():
 
     # input Parameters
@@ -49,6 +50,7 @@ def main():
     # start collector based on data source type.
     start_collector(args.type, args.workers_num, args.ingest_id)
 
+
 def start_collector(type, workers_num, id=None):
 
     # generate ingest id
@@ -68,7 +70,7 @@ def start_collector(type, workers_num, id=None):
         sys.exit(1)
 
     # validate if kerberos authentication is required.
-    if os.getenv('KRB_AUTH'):
+    if Config.kerberos_enabled():
         kb = Kerberos()
         kb.authenticate()
 
@@ -80,17 +82,20 @@ def start_collector(type, workers_num, id=None):
     # required zookeeper info.
     zk_server = MASTER_CONF["kafka"]['zookeper_server']
     zk_port = MASTER_CONF["kafka"]['zookeper_port']
-
-    topic = "SPOT-INGEST-{0}_{1}".format(type, ingest_id) if not id else id
-    kafka = KafkaTopic(topic, k_server, k_port, zk_server, zk_port, workers_num)
+         
+    topic = "{0}".format(type,ingest_id) if not id else id
+    producer = KafkaProducer(topic, k_server, k_port, zk_server, zk_port, workers_num)
 
     # create a collector instance based on data source type.
     logger.info("Starting {0} ingest instance".format(topic))
-    module = __import__("pipelines.{0}.collector".format(MASTER_CONF["pipelines"][type]["type"]), fromlist=['Collector'])
+    module = __import__("pipelines.{0}.collector".
+                        format(MASTER_CONF["pipelines"][type]["type"]),
+                        fromlist=['Collector'])
 
     # start collector.
-    ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], kafka, type)
+    ingest_collector = module.Collector(MASTER_CONF['hdfs_app_path'], producer, type)
     ingest_collector.start()
 
+
 if __name__ == '__main__':
     main()

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/dns/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/dns/collector.py b/spot-ingest/pipelines/dns/collector.py
index c421c47..97c5ed6 100755
--- a/spot-ingest/pipelines/dns/collector.py
+++ b/spot-ingest/pipelines/dns/collector.py
@@ -18,26 +18,29 @@
 #
 
 import time
+import logging
 import os
-import subprocess
 import json
-import logging
 from multiprocessing import Process
 from common.utils import Util
+from common import hdfs_client as hdfs
+from common.hdfs_client import HdfsException
 from common.file_collector import FileWatcher
 from multiprocessing import Pool
-from common.kafka_client import KafkaTopic
+
 
 class Collector(object):
 
-    def __init__(self, hdfs_app_path, kafka_topic, conf_type):
-        self._initialize_members(hdfs_app_path, kafka_topic, conf_type)
+    def __init__(self, hdfs_app_path, kafkaproducer, conf_type):
+
+        self._initialize_members(hdfs_app_path, kafkaproducer, conf_type)
+
+    def _initialize_members(self, hdfs_app_path, kafkaproducer, conf_type):
 
-    def _initialize_members(self, hdfs_app_path, kafka_topic, conf_type):
         # getting parameters.
         self._logger = logging.getLogger('SPOT.INGEST.DNS')
         self._hdfs_app_path = hdfs_app_path
-        self._kafka_topic = kafka_topic
+        self._producer = kafkaproducer
 
         # get script path
         self._script_path = os.path.dirname(os.path.abspath(__file__))
@@ -64,6 +67,8 @@ class Collector(object):
         self._processes = conf["collector_processes"]
         self._ingestion_interval = conf["ingestion_interval"]
         self._pool = Pool(processes=self._processes)
+        # TODO: review re-use of hdfs.client
+        self._hdfs_client = hdfs.get_client()
 
     def start(self):
 
@@ -74,74 +79,108 @@ class Collector(object):
             while True:
                 self._ingest_files_pool()
                 time.sleep(self._ingestion_interval)
-
         except KeyboardInterrupt:
             self._logger.info("Stopping DNS collector...")
-            Util.remove_kafka_topic(self._kafka_topic.Zookeeper, self._kafka_topic.Topic, self._logger)
+            Util.remove_kafka_topic(self._producer.Zookeeper, self._producer.Topic, self._logger)
             self._watcher.stop()
             self._pool.terminate()
             self._pool.close()
             self._pool.join()
             SystemExit("Ingest finished...")
 
-
     def _ingest_files_pool(self):
+
         if self._watcher.HasFiles:
+
             for x in range(0, self._processes):
-                file = self._watcher.GetNextFile()
-                resutl = self._pool.apply_async(ingest_file, args=(file, self._pkt_num, self._pcap_split_staging, self._kafka_topic.Partition, self._hdfs_root_path, self._kafka_topic.Topic, self._kafka_topic.BootstrapServers, ))
-                #resutl.get() # to debug add try and catch.
-                if  not self._watcher.HasFiles: break    
+                self._logger.info('processes: {0}'.format(self._processes))
+                new_file = self._watcher.GetNextFile()
+                if self._processes <= 1:
+                    _ingest_file(
+                        self._hdfs_client,
+                        new_file,
+                        self._pkt_num,
+                        self._pcap_split_staging,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                        )
+                else:
+                    resutl = self._pool.apply_async(_ingest_file, args=(
+                        self._hdfs_client,
+                        new_file,
+                        self._pkt_num,
+                        self._pcap_split_staging,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                        ))
+                # resutl.get() # to debug add try and catch.
+                if not self._watcher.HasFiles:
+                    break
         return True
 
-def ingest_file(file,pkt_num,pcap_split_staging, partition,hdfs_root_path,topic,kafka_servers):
+
+def _ingest_file(hdfs_client, new_file, pkt_num, pcap_split_staging, hdfs_root_path, producer, topic):
 
     logger = logging.getLogger('SPOT.INGEST.DNS.{0}'.format(os.getpid()))
     
     try:
         # get file name and date.
-        org_file = file
-        file_name_parts = file.split('/')
+        org_file = new_file
+        file_name_parts = new_file.split('/')
         file_name = file_name_parts[len(file_name_parts)-1]
 
         # split file.
         name = file_name.split('.')[0]
-        split_cmd = "editcap -c {0} {1} {2}/{3}_spot.pcap".format(pkt_num,file,pcap_split_staging,name)
+        split_cmd = "editcap -c {0} {1} {2}/{3}_spot.pcap".format(pkt_num,
+                                                                  new_file,
+                                                                  pcap_split_staging,
+                                                                  name)
         logger.info("Splitting file: {0}".format(split_cmd))
         Util.execute_cmd(split_cmd,logger)
 
         logger.info("Removing file: {0}".format(org_file))
         rm_big_file = "rm {0}".format(org_file)
-        Util.execute_cmd(rm_big_file,logger)    
-
-        for currdir,subdir,files in os.walk(pcap_split_staging):
-            for file in files:
-                if file.endswith(".pcap") and "{0}_spot".format(name) in file:
-
-                        # get timestamp from the file name to build hdfs path.
-                        file_date = file.split('.')[0]
-                        pcap_hour = file_date[-6:-4]
-                        pcap_date_path = file_date[-14:-6]
-
-                        # hdfs path with timestamp.
-                        hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,pcap_date_path,pcap_hour)
-
-                        # create hdfs path.
-                        Util.creat_hdfs_folder(hdfs_path,logger)
-
-                        # load file to hdfs.
-                        hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
-                        Util.load_to_hdfs(os.path.join(currdir,file),hadoop_pcap_file,logger)
-
-                        # create event for workers to process the file.
-                        logger.info( "Sending split file to worker number: {0}".format(partition))
-                        KafkaTopic.SendMessage(hadoop_pcap_file,kafka_servers,topic,partition)
-                        logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
-
+        Util.execute_cmd(rm_big_file,logger)
 
-  
     except Exception as err:
-        
-        logger.error("There was a problem, please check the following error message:{0}".format(err.message))
+        logger.error("There was a problem splitting the file: {0}".format(err.message))
         logger.error("Exception: {0}".format(err))
 
+    for currdir, subdir, files in os.walk(pcap_split_staging):
+        for file in files:
+            if file.endswith(".pcap") and "{0}_spot".format(name) in file:
+                # get timestamp from the file name to build hdfs path.
+                file_date = file.split('.')[0]
+                pcap_hour = file_date[-6:-4]
+                pcap_date_path = file_date[-14:-6]
+
+                # hdfs path with timestamp.
+                hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, pcap_date_path, pcap_hour)
+
+                # create hdfs path.
+                try:
+                    if len(hdfs.list_dir(hdfs_path, hdfs_client)) == 0:
+                        logger.info('creating directory: ' + hdfs_path)
+                        hdfs_client.mkdir(hdfs_path, hdfs_client)
+
+                    # load file to hdfs.
+                    hadoop_pcap_file = "{0}/{1}".format(hdfs_path,file)
+                    result = hdfs_client.upload_file(hadoop_pcap_file, os.path.join(currdir,file))
+                    if not result:
+                        logger.error('File failed to upload: ' + hadoop_pcap_file)
+                        raise HdfsException
+
+                    # create event for workers to process the file.
+                    logger.info( "Sending split file to Topic: {0}".format(topic))
+                    producer.SendMessage(hadoop_pcap_file, topic)
+                    logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
+
+                except HdfsException as err:
+                    logger.error('Exception: ' + err.exception)
+                    logger.info('Check Hdfs Connection settings and server health')
+
+                except Exception as err:
+                    logger.info("File {0} failed to be sent to Kafka Topic to: {1}".format(new_file,topic))
+                    logger.error("Error: {0}".format(err))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/dns/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/dns/worker.py b/spot-ingest/pipelines/dns/worker.py
index 6f51f45..f23fa8f 100755
--- a/spot-ingest/pipelines/dns/worker.py
+++ b/spot-ingest/pipelines/dns/worker.py
@@ -21,18 +21,22 @@ import logging
 import datetime
 import subprocess
 import json
+import sys
 import os
 from multiprocessing import Process
 from common.utils import Util
+from common import hive_engine
+from common import hdfs_client as hdfs
+from confluent_kafka import KafkaError, KafkaException
 
 
 class Worker(object):
 
-    def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):
+    def __init__(self, db_name, hdfs_app_path, kafka_consumer, conf_type, processes=None):
         
-        self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)
+        self._initialize_members(db_name,hdfs_app_path, kafka_consumer, conf_type)
 
-    def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
+    def _initialize_members(self, db_name, hdfs_app_path, kafka_consumer, conf_type):
 
         # get logger instance.
         self._logger = Util.get_logger('SPOT.INGEST.WRK.DNS')
@@ -44,32 +48,58 @@ class Worker(object):
         self._script_path = os.path.dirname(os.path.abspath(__file__))
         conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
         conf = json.loads(open(conf_file).read())
-        self._conf = conf["pipelines"][conf_type] 
+        self._conf = conf["pipelines"][conf_type]
+        self._id = "spot-{0}-worker".format(conf_type)
 
         self._process_opt = self._conf['process_opt']
         self._local_staging = self._conf['local_staging']
         self.kafka_consumer = kafka_consumer
 
+        self._cursor = hive_engine.create_connection()
+
     def start(self):
 
         self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
-        for message in self.kafka_consumer.start():
-            self._new_file(message.value)
-
-    def _new_file(self,file):
-
-        self._logger.info("-------------------------------------- New File received --------------------------------------")
+        consumer = self.kafka_consumer.start()
+
+        try:
+            while True:
+                message = consumer.poll(timeout=1.0)
+                if message is None:
+                    continue
+                if not message.error():
+                    self._new_file(message.value().decode('utf-8'))
+                elif message.error():
+                    if message.error().code() == KafkaError._PARTITION_EOF:
+                        continue
+                    elif message.error:
+                        raise KafkaException(message.error())
+
+        except KeyboardInterrupt:
+            sys.stderr.write('%% Aborted by user\n')
+
+        consumer.close()
+
+    def _new_file(self, nf):
+
+        self._logger.info(
+            "-------------------------------------- New File received --------------------------------------"
+        )
         self._logger.info("File: {0} ".format(file))        
-        p = Process(target=self._process_new_file, args=(file,))
+        p = Process(target=self._process_new_file, args=nf)
         p.start() 
         p.join()
 
-    def _process_new_file(self,file):
+    def _process_new_file(self, nf):
+
 
         # get file from hdfs
-        get_file_cmd = "hadoop fs -get {0} {1}.".format(file,self._local_staging)
-        self._logger.info("Getting file from hdfs: {0}".format(get_file_cmd))
-        Util.execute_cmd(get_file_cmd,self._logger)
+        self._logger.info("Getting file from hdfs: {0}".format(nf))
+        if hdfs.file_exists(nf):
+            hdfs.download_file(nf, self._local_staging)
+        else:
+            self._logger.info("file: {0} not found".format(nf))
+            # TODO: error handling
 
         # get file name and date
         file_name_parts = file.split('/')
@@ -82,37 +112,86 @@ class Worker(object):
         binary_day = binary_date_path[6:8]
 
         # build process cmd.
-        process_cmd = "tshark -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt)
+        process_cmd = "tshark -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging, file_name, self._process_opt)
         self._logger.info("Processing file: {0}".format(process_cmd))
-        Util.execute_cmd(process_cmd,self._logger)
+        Util.execute_cmd(process_cmd, self._logger)
 
         # create hdfs staging.
         hdfs_path = "{0}/dns".format(self._hdfs_app_path)
         staging_timestamp = datetime.datetime.now().strftime('%M%S%f')[:-4]
         hdfs_staging_path =  "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
-        create_staging_cmd = "hadoop fs -mkdir -p {0}".format(hdfs_staging_path)
-        self._logger.info("Creating staging: {0}".format(create_staging_cmd))
-        Util.execute_cmd(create_staging_cmd,self._logger)
+        self._logger.info("Creating staging: {0}".format(hdfs_staging_path))
+        hdfs.mkdir(hdfs_staging_path)
 
         # move to stage.
-        mv_to_staging ="hadoop fs -moveFromLocal {0}{1}.csv {2}/.".format(self._local_staging,file_name,hdfs_staging_path)
-        self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
-        Util.execute_cmd(mv_to_staging,self._logger)
+        local_file = "{0}{1}.csv".format(self._local_staging, file_name)
+        self._logger.info("Moving data to staging: {0}".format(hdfs_staging_path))
+        hdfs.upload_file(hdfs_staging_path, local_file)
 
         #load to avro
-        load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/dns/load_dns_avro_parquet.hql".format(self._db_name,binary_year,binary_month,binary_day,binary_hour,hdfs_staging_path)
-
-        self._logger.info("Loading data to hive: {0}".format(load_to_avro_cmd))
-        Util.execute_cmd(load_to_avro_cmd,self._logger)
+        drop_table = 'DROP TABLE IF EXISTS {0}.dns_tmp'.format(self._db_name)
+        self._cursor.execute(drop_table)
+
+        # Create external table
+        create_external = ("\n"
+                           "CREATE EXTERNAL TABLE {0}.dns_tmp (\n"
+                           "  frame_day STRING,\n"
+                           "  frame_time STRING,\n"
+                           "  unix_tstamp BIGINT,\n"
+                           "  frame_len INT,\n"
+                           "  ip_src STRING,\n"
+                           "  ip_dst STRING,\n"
+                           "  dns_qry_name STRING,\n"
+                           "  dns_qry_type INT,\n"
+                           "  dns_qry_class STRING,\n"
+                           "  dns_qry_rcode INT,\n"
+                           "  dns_a STRING  \n"
+                           "  )\n"
+                           "  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+                           "  STORED AS TEXTFILE\n"
+                           "  LOCATION '{1}'\n"
+                           "  TBLPROPERTIES ('avro.schema.literal'='{{\n"
+                           "  \"type\":   \"record\"\n"
+                           "  , \"name\":   \"RawDnsRecord\"\n"
+                           "  , \"namespace\" : \"com.cloudera.accelerators.dns.avro\"\n"
+                           "  , \"fields\": [\n"
+                           "      {{\"name\": \"frame_day\",        \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"frame_time\",     \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"unix_tstamp\",    \"type\":[\"bigint\", \"null\"]}\n"
+                           "      , {{\"name\": \"frame_len\",      \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"ip_src\",         \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"ip_dst\",         \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_name\",   \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_type\",   \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_class\",  \"type\":[\"string\", \"null\"]}\n"
+                           "      , {{\"name\": \"dns_qry_rcode\",  \"type\":[\"int\",    \"null\"]}\n"
+                           "      , {{\"name\": \"dns_a\",          \"type\":[\"string\", \"null\"]}\n"
+                           "      ]\n"
+                           "}')\n"
+                           ).format(self._db_name, hdfs_staging_path)
+        self._logger.info( "Creating external table: {0}".format(create_external))
+        self._cursor.execute(create_external)
+
+        # Insert data
+        insert_into_table = """
+            INSERT INTO TABLE {0}.dns
+            PARTITION (y={1}, m={2}, d={3}, h={4)
+            SELECT   CONCAT(frame_day , frame_time) as treceived, unix_tstamp, frame_len, ip_dst, ip_src, dns_qry_name,
+            dns_qry_class,dns_qry_type, dns_qry_rcode, dns_a 
+            FROM {0}.dns_tmp
+        """.format(self._db_name,binary_year,binary_month,binary_day,binary_hour)
+        self._logger.info( "Loading data to {0}: {1}"
+                           .format(self._db_name, insert_into_table)
+                           )
+        self._cursor.execute(insert_into_table)
 
         # remove from hdfs staging
-        rm_hdfs_staging_cmd = "hadoop fs -rm -R -skipTrash {0}".format(hdfs_staging_path)
-        self._logger.info("Removing staging path: {0}".format(rm_hdfs_staging_cmd))
-        Util.execute_cmd(rm_hdfs_staging_cmd,self._logger)
+        self._logger.info("Removing staging path: {0}".format(hdfs_staging_path))
+        hdfs.delete_folder(hdfs_staging_path)
 
         # remove from local staging.
         rm_local_staging = "rm {0}{1}".format(self._local_staging,file_name)
         self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
-        Util.execute_cmd(rm_local_staging,self._logger)
+        Util.execute_cmd(rm_local_staging, self._logger)
 
         self._logger.info("File {0} was successfully processed.".format(file_name))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/flow/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/flow/collector.py b/spot-ingest/pipelines/flow/collector.py
index b9a97f2..5e5cd49 100755
--- a/spot-ingest/pipelines/flow/collector.py
+++ b/spot-ingest/pipelines/flow/collector.py
@@ -23,22 +23,24 @@ import os
 import json
 from multiprocessing import Process
 from common.utils import Util
+from common import hdfs_client as hdfs
+from common.hdfs_client import HdfsException
 from common.file_collector import FileWatcher
 from multiprocessing import Pool
-from common.kafka_client import KafkaTopic
+
 
 class Collector(object):
 
-    def __init__(self,hdfs_app_path,kafka_topic,conf_type):
+    def __init__(self, hdfs_app_path, kafkaproducer, conf_type):
         
-        self._initialize_members(hdfs_app_path,kafka_topic,conf_type)
+        self._initialize_members(hdfs_app_path, kafkaproducer, conf_type)
+
+    def _initialize_members(self, hdfs_app_path, kafkaproducer, conf_type):
 
-    def _initialize_members(self,hdfs_app_path,kafka_topic,conf_type):
-  
         # getting parameters.
         self._logger = logging.getLogger('SPOT.INGEST.FLOW')
         self._hdfs_app_path = hdfs_app_path
-        self._kafka_topic = kafka_topic
+        self._producer = kafkaproducer
 
         # get script path
         self._script_path = os.path.dirname(os.path.abspath(__file__))
@@ -62,6 +64,8 @@ class Collector(object):
         self._processes = conf["collector_processes"]
         self._ingestion_interval = conf["ingestion_interval"]
         self._pool = Pool(processes=self._processes)
+        # TODO: review re-use of hdfs.client
+        self._hdfs_client = hdfs.get_client()
 
     def start(self):
 
@@ -74,54 +78,83 @@ class Collector(object):
                 time.sleep(self._ingestion_interval)
         except KeyboardInterrupt:
             self._logger.info("Stopping FLOW collector...")  
-            Util.remove_kafka_topic(self._kafka_topic.Zookeeper,self._kafka_topic.Topic,self._logger)          
+            Util.remove_kafka_topic(self._producer.Zookeeper, self._producer.Topic, self._logger)
             self._watcher.stop()
             self._pool.terminate()
             self._pool.close()            
             self._pool.join()
             SystemExit("Ingest finished...")
-    
 
     def _ingest_files_pool(self):            
        
         if self._watcher.HasFiles:
             
-            for x in range(0,self._processes):
-                file = self._watcher.GetNextFile()
-                resutl = self._pool.apply_async(ingest_file,args=(file,self._kafka_topic.Partition,self._hdfs_root_path ,self._kafka_topic.Topic,self._kafka_topic.BootstrapServers,))
-                #resutl.get() # to debug add try and catch.
-                if  not self._watcher.HasFiles: break    
+            for x in range(0, self._processes):
+                self._logger.info('processes: {0}'.format(self._processes))
+                new_file = self._watcher.GetNextFile()
+                if self._processes <= 1:
+                    _ingest_file(
+                                 new_file,
+                                 self._hdfs_root_path,
+                                 self._producer,
+                                 self._producer.Topic
+                                 )
+                else:
+                    result = self._pool.apply_async(_ingest_file, args=(
+                        new_file,
+                        self._hdfs_root_path,
+                        self._producer,
+                        self._producer.Topic
+                    ))
+                    # result.get()  # to debug add try and catch.
+                if not self._watcher.HasFiles:
+                    break
         return True
-    
 
 
-def ingest_file(file,partition,hdfs_root_path,topic,kafka_servers):
+def _ingest_file(new_file, hdfs_root_path, producer, topic):
 
-        logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))
-
-        try:
+    logger = logging.getLogger('SPOT.INGEST.FLOW.{0}'.format(os.getpid()))
 
-            # get file name and date.
-            file_name_parts = file.split('/')
-            file_name = file_name_parts[len(file_name_parts)-1]
-            file_date = file_name.split('.')[1]
+    try:
 
-            file_date_path = file_date[0:8]
-            file_date_hour = file_date[8:10]
+        # get file name and date.
+        file_name_parts = new_file.split('/')
+        file_name = file_name_parts[len(file_name_parts)-1]
+        file_date = file_name.split('.')[1]
+        file_date_path = file_date[0:8]
+        file_date_hour = file_date[8:10]
 
-            # hdfs path with timestamp.
-            hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path,file_date_path,file_date_hour)
-            Util.creat_hdfs_folder(hdfs_path,logger)
+        # hdfs path with timestamp.
+        hdfs_path = "{0}/binary/{1}/{2}".format(hdfs_root_path, file_date_path, file_date_hour)
+        hdfs_file = "{0}/{1}".format(hdfs_path, file_name)
 
-            # load to hdfs.
-            hdfs_file = "{0}/{1}".format(hdfs_path,file_name)
-            Util.load_to_hdfs(file,hdfs_file,logger)
-
-            # create event for workers to process the file.
-            logger.info("Sending file to worker number: {0}".format(partition))
-            KafkaTopic.SendMessage(hdfs_file,kafka_servers,topic,partition)    
-            logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(file,topic))
-
-        except Exception as err:
-            logger.error("There was a problem, please check the following error message:{0}".format(err.message))
-            logger.error("Exception: {0}".format(err))
+        try:
+            if len(hdfs.list_dir(hdfs_path)) == 0:
+                logger.info('creating directory: ' + hdfs_path)
+                hdfs.mkdir(hdfs_path)
+            logger.info('uploading file to hdfs: ' + hdfs_file)
+            result = hdfs.upload_file(hdfs_path, new_file)
+            if not result:
+                logger.error('File failed to upload: ' + hdfs_file)
+                raise HdfsException
+            else:
+                rm_file = "rm {0}".format(new_file)
+                logger.info("Removing files from local staging: {0}".format(rm_file))
+                Util.execute_cmd(rm_file, logger)
+
+        except HdfsException as err:
+            logger.error('Exception: ' + err.exception)
+            logger.info('Check Hdfs Connection settings and server health')
+
+    except Exception as err:
+        logger.error("There was a problem, Exception: {0}".format(err))
+
+        # create event for workers to process the file.
+        # logger.info("Sending file to worker number: {0}".format(partition))
+    try:
+        producer.SendMessage(hdfs_file, topic)
+        logger.info("File {0} has been successfully sent to Kafka Topic to: {1}".format(hdfs_file, topic))
+    except Exception as err:
+        logger.info("File {0} failed to be sent to Kafka Topic to: {1}".format(hdfs_file, topic))
+        logger.error("Error: {0}".format(err))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/flow/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/flow/worker.py b/spot-ingest/pipelines/flow/worker.py
index 1630022..bb957a5 100755
--- a/spot-ingest/pipelines/flow/worker.py
+++ b/spot-ingest/pipelines/flow/worker.py
@@ -22,17 +22,20 @@ import subprocess
 import datetime
 import logging
 import os
-import json 
+import json
 from multiprocessing import Process
 from common.utils import Util
+from common import hive_engine
+from common import hdfs_client as hdfs
+from confluent_kafka import KafkaError, KafkaException
 
 
 class Worker(object):
 
-    def __init__(self,db_name,hdfs_app_path,kafka_consumer,conf_type,processes=None):
-        self._initialize_members(db_name,hdfs_app_path,kafka_consumer,conf_type)
+    def __init__(self, db_name, hdfs_app_path, kafka_consumer, conf_type, processes=None):
+        self._initialize_members(db_name, hdfs_app_path, kafka_consumer, conf_type)
 
-    def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type):
+    def _initialize_members(self, db_name, hdfs_app_path, kafka_consumer, conf_type):
 
         # get logger instance.
         self._logger = Util.get_logger('SPOT.INGEST.WRK.FLOW')
@@ -45,76 +48,186 @@ class Worker(object):
         conf_file = "{0}/ingest_conf.json".format(os.path.dirname(os.path.dirname(self._script_path)))
         conf = json.loads(open(conf_file).read())
         self._conf = conf["pipelines"][conf_type]
+        self._id = "spot-{0}-worker".format(conf_type)
 
         self._process_opt = self._conf['process_opt']
         self._local_staging = self._conf['local_staging']
         self.kafka_consumer = kafka_consumer
 
+        # self._cursor = hive_engine.create_connection()
+        self._cursor = hive_engine
+
     def start(self):
 
         self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic))
-        for message in self.kafka_consumer.start():
-            self._new_file(message.value)
-
-    def _new_file(self,file):
-
-        self._logger.info("-------------------------------------- New File received --------------------------------------")
-        self._logger.info("File: {0} ".format(file))        
-        p = Process(target=self._process_new_file, args=(file,))
+        consumer = self.kafka_consumer.start()
+        try:
+            while True:
+                message = consumer.poll(timeout=1.0)
+                if message is None:
+                    continue
+                if not message.error():
+                    self._new_file(message.value().decode('utf-8'))
+                elif message.error():
+                    if message.error().code() == KafkaError._PARTITION_EOF:
+                        continue
+                    elif message.error:
+                        raise KafkaException(message.error())
+
+        except KeyboardInterrupt:
+            sys.stderr.write('%% Aborted by user\n')
+
+        consumer.close()
+
+    def _new_file(self, nf):
+
+        self._logger.info(
+            "-------------------------------------- New File received --------------------------------------"
+        )
+        self._logger.info("File: {0} ".format(nf))
+
+        p = Process(target=self._process_new_file, args=(nf, ))
         p.start()
         p.join()
         
-    def _process_new_file(self,file):
-
-        # get file from hdfs
-        get_file_cmd = "hadoop fs -get {0} {1}.".format(file,self._local_staging)
-        self._logger.info("Getting file from hdfs: {0}".format(get_file_cmd))
-        Util.execute_cmd(get_file_cmd,self._logger)
+    def _process_new_file(self, nf):
 
         # get file name and date
-        file_name_parts = file.split('/')
+        file_name_parts = nf.split('/')
         file_name = file_name_parts[len(file_name_parts)-1]
-
+        nf_path = nf.rstrip(file_name)
         flow_date = file_name.split('.')[1]
         flow_year = flow_date[0:4]
         flow_month = flow_date[4:6]
         flow_day = flow_date[6:8]
         flow_hour = flow_date[8:10]
 
+        # get file from hdfs
+        if hdfs.file_exists(nf_path, file_name):
+            self._logger.info("Getting file from hdfs: {0}".format(nf))
+            hdfs.download_file(nf, self._local_staging)
+        else:
+            self._logger.info("file: {0} not found".format(nf))
+            # TODO: error handling
+
         # build process cmd.
-        process_cmd = "nfdump -o csv -r {0}{1} {2} > {0}{1}.csv".format(self._local_staging,file_name,self._process_opt)
+        sf = "{0}{1}.csv".format(self._local_staging,file_name)
+        process_cmd = "nfdump -o csv -r {0}{1} {2} > {3}".format(self._local_staging, file_name, self._process_opt, sf)
         self._logger.info("Processing file: {0}".format(process_cmd))
-        Util.execute_cmd(process_cmd,self._logger)        
+        Util.execute_cmd(process_cmd,self._logger)
 
         # create hdfs staging.
         hdfs_path = "{0}/flow".format(self._hdfs_app_path)
         staging_timestamp = datetime.datetime.now().strftime('%M%S%f')[:-4]
-        hdfs_staging_path =  "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
-        create_staging_cmd = "hadoop fs -mkdir -p {0}".format(hdfs_staging_path)
-        self._logger.info("Creating staging: {0}".format(create_staging_cmd))
-        Util.execute_cmd(create_staging_cmd,self._logger)
+        hdfs_staging_path = "{0}/stage/{1}".format(hdfs_path,staging_timestamp)
+        self._logger.info("Creating staging: {0}".format(hdfs_staging_path))
+        hdfs.mkdir(hdfs_staging_path)
 
         # move to stage.
-        mv_to_staging ="hadoop fs -moveFromLocal {0}{1}.csv {2}/.".format(self._local_staging,file_name,hdfs_staging_path)
-        self._logger.info("Moving data to staging: {0}".format(mv_to_staging))
-        subprocess.call(mv_to_staging,shell=True)
-
-        #load to avro
-        load_to_avro_cmd = "hive -hiveconf dbname={0} -hiveconf y={1} -hiveconf m={2} -hiveconf d={3} -hiveconf h={4} -hiveconf data_location='{5}' -f pipelines/flow/load_flow_avro_parquet.hql".format(self._db_name,flow_year,flow_month,flow_day,flow_hour,hdfs_staging_path)
-
-        self._logger.info( "Loading data to hive: {0}".format(load_to_avro_cmd))
-        Util.execute_cmd(load_to_avro_cmd,self._logger)
+        local_file = "{0}{1}.csv".format(self._local_staging, file_name)
+        self._logger.info("Moving data to staging: {0}".format(hdfs_staging_path))
+        hdfs.upload_file(hdfs_staging_path, local_file)
+
+        # load with impyla
+        drop_table = "DROP TABLE IF EXISTS {0}.flow_tmp".format(self._db_name)
+        self._logger.info( "Dropping temp table: {0}".format(drop_table))
+        self._cursor.execute_query(drop_table)
+
+        create_external = ("\n"
+                           "CREATE EXTERNAL TABLE {0}.flow_tmp (\n"
+                           "  treceived STRING,\n"
+                           "  tryear INT,\n"
+                           "  trmonth INT,\n"
+                           "  trday INT,\n"
+                           "  trhour INT,\n"
+                           "  trminute INT,\n"
+                           "  trsec INT,\n"
+                           "  tdur FLOAT,\n"
+                           "  sip  STRING,\n"
+                           "  dip STRING,\n"
+                           "  sport INT,\n"
+                           "  dport INT,\n"
+                           "  proto STRING,\n"
+                           "  flag STRING,\n"
+                           "  fwd INT,\n"
+                           "  stos INT,\n"
+                           "  ipkt BIGINT,\n"
+                           "  ibyt BIGINT,\n"
+                           "  opkt BIGINT,\n"
+                           "  obyt BIGINT,\n"
+                           "  input INT,\n"
+                           "  output INT,\n"
+                           "  sas INT,\n"
+                           "  das INT,\n"
+                           "  dtos INT,\n"
+                           "  dir INT,\n"
+                           "  rip STRING\n"
+                           "  )\n"
+                           "  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+                           "  STORED AS TEXTFILE\n"
+                           "  LOCATION '{1}'\n"
+                           "  TBLPROPERTIES ('avro.schema.literal'='{{\n"
+                           "  \"type\":   \"record\"\n"
+                           "  , \"name\":   \"RawFlowRecord\"\n"
+                           "  , \"namespace\" : \"com.cloudera.accelerators.flows.avro\"\n"
+                           "  , \"fields\": [\n"
+                           "      {{\"name\": \"treceived\",             \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"tryear\",              \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trmonth\",             \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trday\",               \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trhour\",              \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trminute\",            \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"trsec\",               \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"tdur\",                \"type\":[\"float\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sport\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dport\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"proto\",              \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"flag\",               \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"fwd\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"stos\",                  \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"ipkt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"ibytt\",              \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"opkt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"obyt\",               \"type\":[\"bigint\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"input\",                 \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"output\",                \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"sas\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"das\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dtos\",                  \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"dir\",                   \"type\":[\"int\",   \"null\"]}}\n"
+                           "      ,  {{\"name\": \"rip\",                \"type\":[\"string\",   \"null\"]}}\n"
+                           "      ]\n"
+                           "}}')\n"
+                           ).format(self._db_name, hdfs_staging_path)
+        self._logger.info( "Creating external table: {0}".format(create_external))
+        self._cursor.execute_query(create_external)
+
+        insert_into_table = """
+        INSERT INTO TABLE {0}.flow
+        PARTITION (y={1}, m={2}, d={3}, h={4})
+        SELECT   treceived,  unix_timestamp(treceived) AS unix_tstamp, tryear,  trmonth, trday,  trhour,  trminute,  trsec,
+          tdur,  sip, dip, sport, dport,  proto,  flag,  fwd,  stos,  ipkt,  ibyt,  opkt,  obyt,  input,  output,
+          sas,  das,  dtos,  dir,  rip
+        FROM {0}.flow_tmp
+        """.format(self._db_name, flow_year, flow_month, flow_day, flow_hour)
+        self._logger.info( "Loading data to {0}: {1}"
+                           .format(self._db_name, insert_into_table)
+                           )
+        self._cursor.execute_query(insert_into_table)
 
         # remove from hdfs staging
-        rm_hdfs_staging_cmd = "hadoop fs -rm -R -skipTrash {0}".format(hdfs_staging_path)
-        self._logger.info("Removing staging path: {0}".format(rm_hdfs_staging_cmd))
-        Util.execute_cmd(rm_hdfs_staging_cmd,self._logger)
+        self._logger.info("Removing staging path: {0}".format(hdfs_staging_path))
+        hdfs.delete_folder(hdfs_staging_path)
 
         # remove from local staging.
         rm_local_staging = "rm {0}{1}".format(self._local_staging,file_name)
         self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
         Util.execute_cmd(rm_local_staging,self._logger)
 
-        self._logger.info("File {0} was successfully processed.".format(file_name))
-
+        rm_local_staging = "rm {0}".format(sf)
+        self._logger.info("Removing files from local staging: {0}".format(rm_local_staging))
+        Util.execute_cmd(rm_local_staging,self._logger)
 
+        self._logger.info("File {0} was successfully processed.".format(file_name))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/pipelines/proxy/collector.py
----------------------------------------------------------------------
diff --git a/spot-ingest/pipelines/proxy/collector.py b/spot-ingest/pipelines/proxy/collector.py
index 69d708c..008b568 100644
--- a/spot-ingest/pipelines/proxy/collector.py
+++ b/spot-ingest/pipelines/proxy/collector.py
@@ -23,7 +23,7 @@ import os
 import sys
 import copy
 from common.utils import Util, NewFileEvent
-from common.kafka_client import KafkaTopic
+from common.kafka_client import KafkaProducer
 from multiprocessing import Pool
 from common.file_collector import FileWatcher
 import time
@@ -106,10 +106,10 @@ def ingest_file(file,message_size,topic,kafka_servers):
             for line in f:
                 message += line
                 if len(message) > message_size:
-                    KafkaTopic.SendMessage(message,kafka_servers,topic,0)
+                    KafkaProducer.SendMessage(message, kafka_servers, topic, 0)
                     message = ""
             #send the last package.        
-            KafkaTopic.SendMessage(message,kafka_servers,topic,0)            
+            KafkaProducer.SendMessage(message, kafka_servers, topic, 0)
         rm_file = "rm {0}".format(file)
         Util.execute_cmd(rm_file,logger)
         logger.info("File {0} has been successfully sent to Kafka Topic: {1}".format(file,topic))

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/41e51b8f/spot-ingest/worker.py
----------------------------------------------------------------------
diff --git a/spot-ingest/worker.py b/spot-ingest/worker.py
index 5c29148..ce758c5 100755
--- a/spot-ingest/worker.py
+++ b/spot-ingest/worker.py
@@ -24,11 +24,13 @@ import sys
 from common.utils import Util
 from common.kerberos import Kerberos
 from common.kafka_client import KafkaConsumer
+import common.configurator as Config
 
 SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
 CONF_FILE = "{0}/ingest_conf.json".format(SCRIPT_PATH)
 WORKER_CONF = json.loads(open(CONF_FILE).read())
 
+
 def main():
 
     # input parameters
@@ -63,8 +65,8 @@ def start_worker(type, topic, id, processes=None):
         logger.error("The provided data source {0} is not valid".format(type))
         sys.exit(1)
 
-    # validate if kerberos authentication is requiered.
-    if os.getenv('KRB_AUTH'):
+    # validate if kerberos authentication is required.
+    if Config.kerberos_enabled():
         kb = Kerberos()
         kb.authenticate()
 


[09/13] incubator-spot git commit: [SPOT-213] [INGEST] adding common functions for hdfs, hive with support for kerberos

Posted by na...@apache.org.
[SPOT-213] [INGEST] adding common functions for hdfs, hive with support for kerberos


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/1582c4c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/1582c4c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/1582c4c1

Branch: refs/heads/master
Commit: 1582c4c1ad10358d672f0ebef5d2c88ac225c65b
Parents: 13e35fc
Author: natedogs911 <na...@gmail.com>
Authored: Fri Jan 19 09:40:50 2018 -0800
Committer: natedogs911 <na...@gmail.com>
Committed: Fri Jan 19 09:40:50 2018 -0800

----------------------------------------------------------------------
 spot-ingest/common/configurator.py | 119 ++++++++++++++++
 spot-ingest/common/hdfs_client.py  | 233 ++++++++++++++++++++++++++++++++
 spot-ingest/common/hive_engine.py  |  73 ++++++++++
 3 files changed, 425 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/configurator.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/configurator.py b/spot-ingest/common/configurator.py
new file mode 100644
index 0000000..6fe0ede
--- /dev/null
+++ b/spot-ingest/common/configurator.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import ConfigParser
+from io import open
+
+
+def configuration():
+
+    config = ConfigParser.ConfigParser()
+
+    try:
+        conf = open("/etc/spot.conf", "r")
+    except (OSError, IOError) as e:
+        print("Error opening: spot.conf" + " error: " + e.errno)
+        raise e
+
+    config.readfp(SecHead(conf))
+    return config
+
+
+def db():
+    conf = configuration()
+    return conf.get('conf', 'DBNAME').replace("'", "").replace('"', '')
+
+
+def impala():
+    conf = configuration()
+    return conf.get('conf', 'IMPALA_DEM'), conf.get('conf', 'IMPALA_PORT')
+
+
+def hive():
+    conf = configuration()
+    return conf.get('conf', 'HS2_HOST'), conf.get('conf', 'HS2_PORT')
+
+
+def hdfs():
+    conf = configuration()
+    name_node = conf.get('conf',"NAME_NODE")
+    web_port = conf.get('conf',"WEB_PORT")
+    hdfs_user = conf.get('conf',"HUSER")
+    hdfs_user = hdfs_user.split("/")[-1].replace("'", "").replace('"', '')
+    return name_node,web_port,hdfs_user
+
+
+def spot():
+    conf = configuration()
+    return conf.get('conf',"HUSER").replace("'", "").replace('"', '')
+
+
+def kerberos_enabled():
+    conf = configuration()
+    enabled = conf.get('conf', 'KERBEROS').replace("'", "").replace('"', '')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def kerberos():
+    conf = configuration()
+    if kerberos_enabled():
+        principal = conf.get('conf', 'PRINCIPAL')
+        keytab = conf.get('conf', 'KEYTAB')
+        sasl_mech = conf.get('conf', 'SASL_MECH')
+        security_proto = conf.get('conf', 'SECURITY_PROTO')
+        return principal, keytab, sasl_mech, security_proto
+    else:
+        raise KeyError
+
+
+def ssl_enabled():
+    conf = configuration()
+    enabled = conf.get('conf', 'SSL')
+    if enabled.lower() == 'true':
+        return True
+    else:
+        return False
+
+
+def ssl():
+    conf = configuration()
+    if ssl_enabled():
+        ssl_verify = conf.get('conf', 'SSL_VERIFY')
+        ca_location = conf.get('conf', 'CA_LOCATION')
+        cert = conf.get('conf', 'CERT')
+        key = conf.get('conf', 'KEY')
+        return ssl_verify, ca_location, cert, key
+    else:
+        raise KeyError
+
+
+class SecHead(object):
+    def __init__(self, fp):
+        self.fp = fp
+        self.sechead = '[conf]\n'
+
+    def readline(self):
+        if self.sechead:
+            try:
+                return self.sechead
+            finally:
+                self.sechead = None
+        else:
+            return self.fp.readline()

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/hdfs_client.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/hdfs_client.py b/spot-ingest/common/hdfs_client.py
new file mode 100644
index 0000000..5605e9c
--- /dev/null
+++ b/spot-ingest/common/hdfs_client.py
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from hdfs import InsecureClient
+from hdfs.util import HdfsError
+from hdfs import Client
+from hdfs.ext.kerberos import KerberosClient
+from requests import Session
+from json import dump
+from threading import Lock
+import logging
+import configurator as Config
+from sys import stderr
+
+
+class Progress(object):
+
+    """Basic progress tracker callback."""
+
+    def __init__(self, hdfs_path, nbytes):
+        self._data = {}
+        self._lock = Lock()
+        self._hpath = hdfs_path
+        self._nbytes = nbytes
+
+    def __call__(self):
+        with self._lock:
+            if self._nbytes >= 0:
+                self._data[self._hpath] = self._nbytes
+            else:
+                stderr.write('%s\n' % (sum(self._data.values()), ))
+
+
+class SecureKerberosClient(KerberosClient):
+
+    """A new client subclass for handling HTTPS connections with Kerberos.
+
+    :param url: URL to namenode.
+    :param cert: Local certificate. See `requests` documentation for details
+      on how to use this.
+    :param verify: Whether to check the host's certificate. WARNING: non production use only
+    :param \*\*kwargs: Keyword arguments passed to the default `Client`
+      constructor.
+
+    """
+
+    def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs):
+
+        self._logger = logging.getLogger("SPOT.INGEST.HDFS_client")
+        session = Session()
+
+        if verify == 'true':
+            self._logger.info('SSL verification enabled')
+            session.verify = True
+            if cert is not None:
+                self._logger.info('SSL Cert: ' + cert)
+                if ',' in cert:
+                    session.cert = [path.strip() for path in cert.split(',')]
+                else:
+                    session.cert = cert
+        elif verify == 'false':
+            session.verify = False
+
+        super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs)
+
+
+class HdfsException(HdfsError):
+    def __init__(self, message):
+        super(HdfsException, self).__init__(message)
+        self.message = message
+
+
+def get_client(user=None):
+    # type: (object) -> Client
+
+    logger = logging.getLogger('SPOT.INGEST.HDFS.get_client')
+    hdfs_nm, hdfs_port, hdfs_user = Config.hdfs()
+    conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port),
+            'mutual_auth': 'OPTIONAL'
+            }
+
+    if Config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = Config.ssl()
+        conf.update({'verify': ssl_verify.lower()})
+        if cert:
+            conf.update({'cert': cert})
+
+    if Config.kerberos_enabled():
+        # TODO: handle other conditions
+        krb_conf = {'mutual_auth': 'OPTIONAL'}
+        conf.update(krb_conf)
+
+    # TODO: possible user parameter
+    logger.info('Client conf:')
+    for k,v in conf.iteritems():
+        logger.info(k + ': ' + v)
+
+    client = SecureKerberosClient(**conf)
+
+    return client
+
+
+def get_file(hdfs_file, client=None):
+    if not client:
+        client = get_client()
+
+    with client.read(hdfs_file) as reader:
+        results = reader.read()
+        return results
+
+
+def upload_file(hdfs_fp, local_fp, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress)
+        return result
+    except HdfsError as err:
+        return err
+
+
+def download_file(hdfs_path, local_path, overwrite=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.download(hdfs_path, local_path, overwrite=overwrite)
+        return True
+    except HdfsError:
+        return False
+
+
+def mkdir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.makedirs(hdfs_path)
+        return True
+    except HdfsError:
+        return False
+
+
+def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
+        with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer:
+            for item in hdfs_file_content:
+                data = ','.join(str(d) for d in item)
+                writer.write("{0}\n".format(data))
+        return True
+
+    except HdfsError:
+        return False
+
+
+def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name)
+        with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer:
+            dump(hdfs_file_content, writer)
+        return True
+    except HdfsError:
+        return False
+
+
+def delete_folder(hdfs_file, user=None, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        client.delete(hdfs_file,recursive=True)
+    except HdfsError:
+        return False
+
+
+def check_dir(hdfs_path, client=None):
+    """
+    Returns True if directory exists
+    Returns False if directory does not exist
+    : param hdfs_path: path to check
+    : object client: hdfs client object for persistent connection
+    """
+    if not client:
+        client = get_client()
+
+    result = client.list(hdfs_path)
+    if None not in result:
+        return True
+    else:
+        return False
+
+
+def list_dir(hdfs_path, client=None):
+    if not client:
+        client = get_client()
+
+    try:
+        return client.list(hdfs_path)
+    except HdfsError:
+        return {}
+
+
+def file_exists(hdfs_path, file_name, client=None):
+    if not client:
+        client = get_client()
+
+    files = list_dir(hdfs_path, client)
+    if str(file_name) in files:
+        return True
+    else:
+        return False

http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/1582c4c1/spot-ingest/common/hive_engine.py
----------------------------------------------------------------------
diff --git a/spot-ingest/common/hive_engine.py b/spot-ingest/common/hive_engine.py
new file mode 100644
index 0000000..eb3d79e
--- /dev/null
+++ b/spot-ingest/common/hive_engine.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from impala.dbapi import connect
+import common.configurator as config
+
+
+def create_connection():
+
+    host, port = config.hive()
+    conf = {}
+
+    # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default
+    conf.update({'kerberos_service_name': 'hive'})
+
+    if config.kerberos_enabled():
+        principal, keytab, sasl_mech, security_proto = config.kerberos()
+        conf.update({'auth_mechanism': 'GSSAPI',
+                     })
+    else:
+        conf.update({'auth_mechanism': 'PLAIN',
+                     })
+
+    if config.ssl_enabled():
+        ssl_verify, ca_location, cert, key = config.ssl()
+        if ssl_verify.lower() == 'false':
+            conf.update({'use_ssl': ssl_verify})
+        else:
+            conf.update({'ca_cert': cert,
+                         'use_ssl': ssl_verify
+                         })
+
+    db = config.db()
+    conn = connect(host=host, port=int(port), database=db, **conf)
+    return conn.cursor()
+
+
+def execute_query(query,fetch=False):
+
+    impala_cursor = create_connection()
+    impala_cursor.execute(query)
+
+    return impala_cursor if not fetch else impala_cursor.fetchall()
+
+
+def execute_query_as_list(query):
+
+    query_results = execute_query(query)
+    row_result = {}
+    results = []
+
+    for row in query_results:
+        x=0
+        for header in query_results.description:
+            row_result[header[0]] = row[x]
+            x +=1
+        results.append(row_result)
+        row_result = {}
+
+    return results


[13/13] incubator-spot git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-spot into pr/134

Posted by na...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-spot into pr/134


Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/14dbd511
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/14dbd511
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/14dbd511

Branch: refs/heads/master
Commit: 14dbd511d78259dd5104873ec1f3d0caee66e3fe
Parents: f594956 f722127
Author: natedogs911 <na...@gmail.com>
Authored: Mon Mar 19 08:11:53 2018 -0700
Committer: natedogs911 <na...@gmail.com>
Committed: Mon Mar 19 08:11:53 2018 -0700

----------------------------------------------------------------------
 DISCLAIMER                              |  11 ++
 spot-ingest/pipelines/proxy/bluecoat.py | 177 ++++++++++++++++++---------
 spot-oa/api/resources/flow.py           |   6 +-
 spot-oa/oa/flow/flow_oa.py              |  43 ++++---
 4 files changed, 155 insertions(+), 82 deletions(-)
----------------------------------------------------------------------