You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:34:55 UTC
[14/21] incubator-sdap-mudrod git commit: SDAP-1 Import all code
under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java
new file mode 100644
index 0000000..bbfb79c
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ClassName: RequestUrl Function: request url relate operations
+ */
+public class RequestUrl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RequestUrl.class);
+
+ /**
+ * Default Constructor
+ */
+ public RequestUrl() {
+ /* Default Constructor */
+ }
+
+ /**
+ * UrlPage: Get url page from url link
+ *
+ * @param strURL request url
+ * @return page name
+ */
+ public static String urlPage(String strURL) {
+ String strPage = null;
+ String[] arrSplit = null;
+
+ String newURL = strURL.trim().toLowerCase();
+
+ arrSplit = newURL.split("[?]");
+ if (newURL.length() > 0 && arrSplit.length > 1 && arrSplit[0] != null) {
+ strPage = arrSplit[0];
+ }
+
+ return strPage;
+ }
+
+ /**
+ * TruncateUrlPage: Get url params from url link
+ *
+ * @param strURL
+ * @return url params
+ */
+ private static String truncateUrlPage(String strURL) {
+ String strAllParam = null;
+ String[] arrSplit = null;
+
+ strURL = strURL.trim().toLowerCase(); // keep this in mind
+
+ arrSplit = strURL.split("[?]");
+ if (strURL.length() > 1) {
+ if (arrSplit.length > 1) {
+ if (arrSplit[1] != null) {
+ strAllParam = arrSplit[1];
+ }
+ }
+ }
+
+ return strAllParam;
+ }
+
+ /**
+ * URLRequest: Get url params from url link in a map format
+ *
+ * @param URL request url
+ * @return url params key value map
+ */
+ public static Map<String, String> uRLRequest(String URL) {
+ Map<String, String> mapRequest = new HashMap<String, String>();
+
+ String[] arrSplit = null;
+
+ String strUrlParam = truncateUrlPage(URL);
+ if (strUrlParam == null) {
+ return mapRequest;
+ }
+
+ arrSplit = strUrlParam.split("[&]");
+ for (String strSplit : arrSplit) {
+ String[] arrSplitEqual = null;
+ arrSplitEqual = strSplit.split("[=]");
+
+ if (arrSplitEqual.length > 1) {
+
+ mapRequest.put(arrSplitEqual[0], arrSplitEqual[1]);
+
+ } else {
+ if (arrSplitEqual[0] != "") {
+
+ mapRequest.put(arrSplitEqual[0], "");
+ }
+ }
+ }
+ return mapRequest;
+ }
+
+ /**
+ * GetSearchInfo: Get search information from url link
+ *
+ * @param URL request url
+ * @return search params
+ * @throws UnsupportedEncodingException UnsupportedEncodingException
+ */
+ public String getSearchInfo(String URL) throws UnsupportedEncodingException {
+ List<String> info = new ArrayList<String>();
+ String keyword = "";
+ Map<String, String> mapRequest = RequestUrl.uRLRequest(URL);
+ if (mapRequest.get("search") != null) {
+ try {
+ keyword = mapRequest.get("search");
+
+ keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
+ if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) {
+ keyword = keyword.replace("%2b", " ");
+ keyword = keyword.replace("%20", " ");
+ keyword = keyword.replace("%25", " ");
+ }
+
+ keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+
+ } catch (UnsupportedEncodingException e) {
+ LOG.error(mapRequest.get("search"));
+ e.printStackTrace();
+ }
+ if (!"".equals(keyword)) {
+ info.add(keyword);
+ }
+
+ }
+
+ if (mapRequest.get("ids") != null && mapRequest.get("values") != null) {
+ String id_raw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8");
+ String value_raw = URLDecoder.decode(mapRequest.get("values"), "UTF-8");
+ String[] ids = id_raw.split(":");
+ String[] values = value_raw.split(":");
+
+ int a = ids.length;
+ int b = values.length;
+ int l = a < b ? a : b;
+
+ for (int i = 0; i < l; i++) {
+ if (ids[i].equals("collections") || ids[i].equals("measurement") || ids[i].equals("sensor") || ids[i].equals("platform") || ids[i].equals("variable") || ids[i].equals("spatialcoverage")) {
+ try {
+ values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25");
+ if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && !URLDecoder.decode(values[i], "UTF-8").equals("")) {
+ String item = URLDecoder.decode(values[i], "UTF-8").trim();
+ if (item.contains("%2b") || item.contains("%20") || item.contains("%25")) {
+ item = item.replace("%2b", " ");
+ item = item.replace("%20", " ");
+ item = item.replace("%25", " ");
+ }
+ item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+ info.add(item);
+ }
+ } catch (Exception e) {
+ LOG.error(values[i]);
+ e.printStackTrace();
+ }
+ }
+
+ }
+ }
+
+ return String.join(",", info);
+ }
+
+ /**
+ * GetSearchWord: Get search words from url link
+ *
+ * @param url request url
+ * @return query
+ */
+ public static String getSearchWord(String url) {
+ String keyword = "";
+
+ Map<String, String> mapRequest = RequestUrl.uRLRequest(url);
+ if (mapRequest.get("search") != null) {
+ try {
+ keyword = mapRequest.get("search");
+
+ keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
+ if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) {
+ keyword = keyword.replace("%2b", " ");
+ keyword = keyword.replace("%20", " ");
+ keyword = keyword.replace("%25", " ");
+ }
+ keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+ } catch (UnsupportedEncodingException e) {
+ LOG.error(mapRequest.get("search"));
+ e.printStackTrace();
+ }
+ }
+
+ return keyword;
+ }
+
+ /**
+ * GetFilterInfo: Get filter params from url link
+ *
+ * @param url request url
+ * @return filter facet key pair map
+ * @throws UnsupportedEncodingException UnsupportedEncodingException
+ */
+ public static Map<String, String> getFilterInfo(String url) throws UnsupportedEncodingException {
+ List<String> info = new ArrayList<>();
+ Map<String, String> filterValues = new HashMap<>();
+
+ String keyword = "";
+ Map<String, String> mapRequest = RequestUrl.uRLRequest(url);
+ if (mapRequest.get("search") != null) {
+ try {
+ keyword = mapRequest.get("search");
+
+ keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
+ if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) {
+ keyword = keyword.replace("%2b", " ");
+ keyword = keyword.replace("%20", " ");
+ keyword = keyword.replace("%25", " ");
+ }
+ keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+
+ } catch (UnsupportedEncodingException e) {
+ LOG.error(mapRequest.get("search"));
+ e.printStackTrace();
+ }
+ if (!"".equals(keyword)) {
+ info.add(keyword);
+ }
+
+ }
+
+ if (mapRequest.get("ids") != null && mapRequest.get("values") != null) {
+ String idRaw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8");
+ String valueRaw = URLDecoder.decode(mapRequest.get("values"), "UTF-8");
+ String[] ids = idRaw.split(":");
+ String[] values = valueRaw.split(":");
+
+ int a = ids.length;
+ int b = values.length;
+ int l = a < b ? a : b;
+
+ for (int i = 0; i < l; i++) {
+ try {
+ values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25");
+ if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && !URLDecoder.decode(values[i], "UTF-8").equals("")) {
+ String item = URLDecoder.decode(values[i], "UTF-8").trim();
+ if (item.contains("%2b") || item.contains("%20") || item.contains("%25")) {
+ item = item.replace("%2b", " ");
+ item = item.replace("%20", " ");
+ item = item.replace("%25", " ");
+ }
+ item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+ filterValues.put(ids[i], item);
+ }
+ } catch (Exception e) {
+ LOG.error(values[i]);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ if (mapRequest.get("temporalsearch") != null) {
+ String temporalsearch = mapRequest.get("temporalsearch");
+ temporalsearch = URLDecoder.decode(temporalsearch.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
+
+ filterValues.put("temporalsearch", temporalsearch);
+ }
+
+ return filterValues;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java
new file mode 100644
index 0000000..93f4288
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortOrder;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * ClassName: Session Function: Session operations.
+ */
+public class Session /*extends MudrodAbstract*/ implements Comparable<Session> {
+ private static final Logger LOG = LoggerFactory.getLogger(Session.class);
+ // start: start time of session
+ private String start;
+ // end: end time of session
+ private String end;
+ // id: original session ID
+ private String id;
+ // newid: new session ID
+ private String newid = null;
+ // fmt: time formatter
+ private DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
+
+ private ESDriver es;
+ private Properties props;
+
+ /**
+ * Creates a new instance of Session.
+ *
+ * @param props the Mudrod configuration
+ * @param es the Elasticsearch drive
+ * @param start start time of session
+ * @param end end time of session
+ * @param id session ID
+ */
+ public Session(Properties props, ESDriver es, String start, String end, String id) {
+ this.start = start;
+ this.end = end;
+ this.id = id;
+
+ this.props = props;
+ this.es = es;
+ }
+
+ /**
+ * Creates a new instance of Session.
+ *
+ * @param props the Mudrod configuration
+ * @param es the Elasticsearch drive
+ */
+ public Session(Properties props, ESDriver es) {
+ this.props = props;
+ this.es = es;
+ }
+
+ /**
+ * getID: Get original session ID
+ *
+ * @return session id
+ */
+ public String getID() {
+ return id;
+ }
+
+ /**
+ * getNewID: Get new session ID
+ *
+ * @return new session id
+ */
+ public String getNewID() {
+ return newid;
+ }
+
+ /**
+ * setNewID: Set new session ID
+ *
+ * @param str: session ID
+ * @return new session id
+ */
+ public String setNewID(String str) {
+ return newid = str;
+ }
+
+ /**
+ * getStartTime:Get start time of current session
+ *
+ * @return start time of session
+ */
+ public String getStartTime() {
+ return start;
+ }
+
+ /**
+ * getEndTime:Get end time of current session
+ *
+ * @return end time of session
+ */
+ public String getEndTime() {
+ return end;
+ }
+
+ /**
+ * Compare current session with another session
+ *
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ @Override
+ public int compareTo(Session o) {
+ fmt.parseDateTime(this.end);
+ fmt.parseDateTime(o.end);
+ // ascending order
+ return Seconds.secondsBetween(fmt.parseDateTime(o.end), fmt.parseDateTime(this.end)).getSeconds();
+
+ }
+
+ /**
+ * getSessionDetail:Get detail of current session, which is used for session
+ * tree reconstruct
+ *
+ * @param indexName name of index from which you wish to obtain session detail.
+ * @param type: Session type name in Elasticsearch
+ * @param sessionID: Session ID
+ * @return Session details in Json format
+ */
+ public JsonObject getSessionDetail(String indexName, String type, String sessionID) {
+ JsonObject sessionResults = new JsonObject();
+ // for session tree
+ SessionTree tree = null;
+ JsonElement jsonRequest = null;
+ try {
+ tree = this.getSessionTree(indexName, type, sessionID);
+ JsonObject jsonTree = tree.treeToJson(tree.root);
+ sessionResults.add("treeData", jsonTree);
+
+ jsonRequest = this.getRequests(type, sessionID);
+ sessionResults.add("RequestList", jsonRequest);
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Encoding error detected.", e);
+
+ }
+
+ return sessionResults;
+ }
+
+ /**
+ * getClickStreamList: Extracted click stream list from current session.
+ *
+ * @param indexName an index from which to query for a session list
+ * @param type: Session type name in Elasticsearch
+ * @param sessionID: Session ID
+ * @return Click stram data list
+ * {@link ClickStream}
+ */
+ public List<ClickStream> getClickStreamList(String indexName, String type, String sessionID) {
+ SessionTree tree = null;
+ try {
+ tree = this.getSessionTree(indexName, type, sessionID);
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Erro whilst obtaining the Session Tree: {}", e);
+ }
+
+ List<ClickStream> clickthroughs = tree.getClickStreamList();
+ return clickthroughs;
+ }
+
+ /**
+ * Method of converting a given session to a tree structure
+ *
+ * @param type session type name in Elasticsearch
+ * @param sessionID ID of session
+ * @return an instance of session tree structure
+ * @throws UnsupportedEncodingException UnsupportedEncodingException
+ */
+ private SessionTree getSessionTree(String indexName, String type, String sessionID) throws UnsupportedEncodingException {
+
+ SearchResponse response = es.getClient().prepareSearch(indexName).setTypes(type).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100).addSort("Time", SortOrder.ASC)
+ .execute().actionGet();
+
+ SessionTree tree = new SessionTree(this.props, this.es, sessionID, type);
+ int seq = 1;
+ for (SearchHit hit : response.getHits().getHits()) {
+ Map<String, Object> result = hit.getSource();
+ String request = (String) result.get("Request");
+ String time = (String) result.get("Time");
+ String logType = (String) result.get("LogType");
+ String referer = (String) result.get("Referer");
+
+ SessionNode node = new SessionNode(request, logType, referer, time, seq);
+ tree.insert(node);
+ seq++;
+ }
+
+ return tree;
+ }
+
+ /**
+ * Method of getting all requests from a given current session
+ *
+ * @param cleanuptype Session type name in Elasticsearch
+ * @param sessionID Session ID
+ * @return all of these requests in JSON
+ * @throws UnsupportedEncodingException UnsupportedEncodingException
+ */
+ private JsonElement getRequests(String cleanuptype, String sessionID) throws UnsupportedEncodingException {
+ SearchResponse response = es.getClient().prepareSearch(props.getProperty("indexName")).setTypes(cleanuptype).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100)
+ .addSort("Time", SortOrder.ASC).execute().actionGet();
+
+ Gson gson = new Gson();
+ List<JsonObject> requestList = new ArrayList<>();
+ int seq = 1;
+ for (SearchHit hit : response.getHits().getHits()) {
+ Map<String, Object> result = hit.getSource();
+ String request = (String) result.get("Request");
+ String requestUrl = (String) result.get("RequestUrl");
+ String time = (String) result.get("Time");
+ String logType = (String) result.get("LogType");
+ String referer = (String) result.get("Referer");
+
+ JsonObject req = new JsonObject();
+ req.addProperty("Time", time);
+ req.addProperty("Request", request);
+ req.addProperty("RequestURL", requestUrl);
+ req.addProperty("LogType", logType);
+ req.addProperty("Referer", referer);
+ req.addProperty("Seq", seq);
+ requestList.add(req);
+
+ seq++;
+ }
+ return gson.toJsonTree(requestList);
+ }
+
+ /**
+ * getClickStreamList: Extracted ranking training data from current session.
+ *
+ * @param indexName an index from which to obtain ranked training data.
+ * @param cleanuptype: Session type name in Elasticsearch
+ * @param sessionID: Session ID
+ * @return Click stram data list
+ * {@link ClickStream}
+ */
+ public List<RankingTrainData> getRankingTrainData(String indexName, String cleanuptype, String sessionID) {
+ SessionTree tree = null;
+ try {
+ tree = this.getSessionTree(indexName, cleanuptype, sessionID);
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Error whilst retreiving Session Tree: {}", e);
+ }
+
+ List<RankingTrainData> trainData = new ArrayList<>();
+ try {
+ trainData = tree.getRankingTrainData(indexName, sessionID);
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Error whilst retreiving ranking training data: {}", e);
+ }
+
+ return trainData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java
new file mode 100644
index 0000000..edba32e
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * ClassName: SessionExtractor Function: Extract sessions details from
+ * reconstructed sessions.
+ */
+public class SessionExtractor implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SessionExtractor.class);
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public SessionExtractor() {
+ // default constructor
+ }
+
+ /**
+ * extractClickStreamFromES:Extract click streams from logs stored in
+ * Elasticsearch
+ *
+ * @param props
+ * the Mudrod configuration
+ * @param es
+ * the Elasticsearch drive
+ * @param spark
+ * the spark driver
+ * @return clickstream list in JavaRDD format {@link ClickStream}
+ */
+ public JavaRDD<ClickStream> extractClickStreamFromES(Properties props, ESDriver es, SparkDriver spark) {
+ switch (props.getProperty(MudrodConstants.PROCESS_TYPE)) {
+ case "sequential":
+ List<ClickStream> queryList = this.getClickStreamList(props, es);
+ return spark.sc.parallelize(queryList);
+ case "parallel":
+ return getClickStreamListInParallel(props, spark, es);
+ default:
+ LOG.error("Error finding processing type for '{}'. Please check your config.xml.", props.getProperty(MudrodConstants.PROCESS_TYPE));
+ }
+ return null;
+ }
+
+ /**
+ * getClickStreamList:Extract click streams from logs stored in Elasticsearch.
+ *
+ * @param props
+ * the Mudrod configuration
+ * @param es
+ * the Elasticsearch driver
+ * @return clickstream list {@link ClickStream}
+ */
+ protected List<ClickStream> getClickStreamList(Properties props, ESDriver es) {
+ List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+ List<ClickStream> result = new ArrayList<>();
+ for (int n = 0; n < logIndexList.size(); n++) {
+ String logIndex = logIndexList.get(n);
+ List<String> sessionIdList;
+ try {
+ sessionIdList = this.getSessions(props, es, logIndex);
+ Session session = new Session(props, es);
+ int sessionNum = sessionIdList.size();
+ for (int i = 0; i < sessionNum; i++) {
+ String[] sArr = sessionIdList.get(i).split(",");
+ List<ClickStream> datas = session.getClickStreamList(sArr[1], sArr[2], sArr[0]);
+ result.addAll(datas);
+ }
+ } catch (Exception e) {
+ LOG.error("Error during extraction of Clickstreams from log index. {}", e);
+ }
+ }
+
+ return result;
+ }
+
+ protected JavaRDD<ClickStream> getClickStreamListInParallel(Properties props, SparkDriver spark, ESDriver es) {
+
+ List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+ LOG.info("Retrieved {}", logIndexList.toString());
+
+ List<String> sessionIdList = new ArrayList<>();
+ for (int n = 0; n < logIndexList.size(); n++) {
+ String logIndex = logIndexList.get(n);
+ List<String> tmpsessionList = this.getSessions(props, es, logIndex);
+ sessionIdList.addAll(tmpsessionList);
+ }
+
+ JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16);
+
+ JavaRDD<ClickStream> clickStreamRDD = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, ClickStream>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Iterator<ClickStream> call(Iterator<String> arg0) throws Exception {
+ ESDriver tmpES = new ESDriver(props);
+ tmpES.createBulkProcessor();
+
+ Session session = new Session(props, tmpES);
+ List<ClickStream> clickstreams = new ArrayList<>();
+ while (arg0.hasNext()) {
+ String s = arg0.next();
+ String[] sArr = s.split(",");
+ List<ClickStream> clicks = session.getClickStreamList(sArr[1], sArr[2], sArr[0]);
+ clickstreams.addAll(clicks);
+ }
+ tmpES.destroyBulkProcessor();
+ tmpES.close();
+ return clickstreams.iterator();
+ }
+ });
+
+ LOG.info("Clickstream number: {}", clickStreamRDD.count());
+
+ return clickStreamRDD;
+ }
+
+ // This function is reserved and not being used for now
+
+ /**
+ * loadClickStremFromTxt:Load click stream form txt file
+ *
+ * @param clickthroughFile
+ * txt file
+ * @param sc
+ * the spark context
+ * @return clickstream list in JavaRDD format {@link ClickStream}
+ */
+ public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile, JavaSparkContext sc) {
+ return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String, ClickStream>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Iterator<ClickStream> call(String line) throws Exception {
+ List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line);
+ return (Iterator<ClickStream>) clickthroughs;
+ }
+ });
+ }
+
+ /**
+ * bulidDataQueryRDD: convert click stream list to data set queries pairs.
+ *
+ * @param clickstreamRDD:
+ * click stream data
+ * @param downloadWeight:
+ * weight of download behavior
+ * @return JavaPairRDD, key is short name of data set, and values are queries
+ */
+ public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) {
+ return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, List<String>> call(ClickStream click) throws Exception {
+ List<String> query = new ArrayList<>();
+ // important! download behavior is given higher weights
+ // than viewing
+ // behavior
+ boolean download = click.isDownload();
+ int weight = 1;
+ if (download) {
+ weight = downloadWeight;
+ }
+ for (int i = 0; i < weight; i++) {
+ query.add(click.getKeyWords());
+ }
+
+ return new Tuple2<>(click.getViewDataset(), query);
+ }
+ }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public List<String> call(List<String> v1, List<String> v2) throws Exception {
+ List<String> list = new ArrayList<>();
+ list.addAll(v1);
+ list.addAll(v2);
+ return list;
+ }
+ });
+ }
+
+ /**
+ * getSessions: Get sessions from logs
+ *
+ * @param props
+ * the Mudrod configuration
+ * @param es
+ * the Elasticsearch driver
+ * @param logIndex
+ * a log index name
+ * @return list of session names
+ */
+ protected List<String> getSessions(Properties props, ESDriver es, String logIndex) {
+
+ String cleanupPrefix = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX);
+ String sessionStatPrefix = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX);
+
+ List<String> sessions = new ArrayList<>();
+ SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(sessionStatPrefix).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+ .actionGet();
+ while (true) {
+ for (SearchHit hit : scrollResp.getHits().getHits()) {
+ Map<String, Object> session = hit.getSource();
+ String sessionID = (String) session.get("SessionID");
+ sessions.add(sessionID + "," + logIndex + "," + cleanupPrefix);
+ }
+
+ scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+ if (scrollResp.getHits().getHits().length == 0) {
+ break;
+ }
+ }
+
+ return sessions;
+ }
+
+ public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
+ return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Double> call(ClickStream click) throws Exception {
+ double rate = 1;
+ boolean download = click.isDownload();
+ if (download) {
+ rate = 2;
+ }
+
+ String sessionID = click.getSessionID();
+ String user = sessionID.split("@")[0];
+
+ return new Tuple2<>(user + "," + click.getViewDataset(), rate);
+ }
+ }).reduceByKey(new Function2<Double, Double, Double>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Double call(Double v1, Double v2) throws Exception {
+ return v1 >= v2 ? v1 : v2;
+
+ }
+ });
+ }
+
+ public JavaPairRDD<String, Double> bulidSessionItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
+ JavaPairRDD<String, String> sessionItemRDD = clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, String>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, String> call(ClickStream click) throws Exception {
+
+ String sessionID = click.getSessionID();
+ return new Tuple2<>(sessionID, click.getViewDataset());
+ }
+ }).distinct();
+
+ // remove some sessions
+ JavaPairRDD<String, Double> sessionItemNumRDD = sessionItemRDD.keys().mapToPair(new PairFunction<String, String, Double>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Double> call(String item) throws Exception {
+ return new Tuple2<>(item, 1.0);
+ }
+ }).reduceByKey(new Function2<Double, Double, Double>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Double call(Double v1, Double v2) throws Exception {
+ return v1 + v2;
+ }
+ }).filter(new Function<Tuple2<String, Double>, Boolean>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Boolean call(Tuple2<String, Double> arg0) throws Exception {
+ Boolean b = true;
+ if (arg0._2 < 2) {
+ b = false;
+ }
+ return b;
+ }
+ });
+
+ return sessionItemNumRDD.leftOuterJoin(sessionItemRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<Double, Optional<String>>>, String, Double>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Double> call(Tuple2<String, Tuple2<Double, Optional<String>>> arg0) throws Exception {
+
+ Tuple2<Double, Optional<String>> test = arg0._2;
+ Optional<String> optStr = test._2;
+ String item = "";
+ if (optStr.isPresent()) {
+ item = optStr.get();
+ }
+ return new Tuple2<>(arg0._1 + "," + item, 1.0);
+ }
+
+ });
+ }
+
+ public JavaPairRDD<String, List<String>> bulidSessionDatasetRDD(Properties props, ESDriver es, SparkDriver spark) {
+
+ List<String> result = new ArrayList<>();
+ List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+ for (int n = 0; n < logIndexList.size(); n++) {
+ String logIndex = logIndexList.get(n);
+ SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(props.getProperty(MudrodConstants.SESSION_STATS_PREFIX)).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery())
+ .setSize(100).execute().actionGet();
+ while (true) {
+ for (SearchHit hit : scrollResp.getHits().getHits()) {
+ Map<String, Object> session = hit.getSource();
+ String sessionID = (String) session.get("SessionID");
+ String views = (String) session.get("views");
+ if (views != null && !"".equals(views)) {
+ String sessionItems = sessionID + ":" + views;
+ result.add(sessionItems);
+ }
+ }
+
+ scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+ if (scrollResp.getHits().getHits().length == 0) {
+ break;
+ }
+ }
+ }
+
+ JavaRDD<String> sessionRDD = spark.sc.parallelize(result);
+
+ return sessionRDD.mapToPair(new PairFunction<String, String, List<String>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, List<String>> call(String sessionitem) throws Exception {
+ String[] splits = sessionitem.split(":");
+ String sessionId = splits[0];
+ List<String> itemList = new ArrayList<>();
+
+ String items = splits[1];
+ String[] itemArr = items.split(",");
+ int size = itemArr.length;
+ for (int i = 0; i < size; i++) {
+ String item = itemArr[i];
+ if (!itemList.contains(item))
+ itemList.add(itemArr[i]);
+ }
+
+ return new Tuple2<>(sessionId, itemList);
+ }
+ });
+ }
+
+ /**
+ * extractClickStreamFromES:Extract click streams from logs stored in
+ * Elasticsearch
+ *
+ * @param props
+ * the Mudrod configuration
+ * @param es
+ * the Elasticsearch drive
+ * @param spark
+ * the spark driver
+ * @return clickstream list in JavaRDD format {@link ClickStream}
+ */
+ public JavaRDD<RankingTrainData> extractRankingTrainData(Properties props, ESDriver es, SparkDriver spark) {
+
+ List<RankingTrainData> queryList = this.extractRankingTrainData(props, es);
+ return spark.sc.parallelize(queryList);
+
+ }
+
+ /**
+ * getClickStreamList:Extract click streams from logs stored in Elasticsearch.
+ *
+ * @param props
+ * the Mudrod configuration
+ * @param es
+ * the Elasticsearch driver
+ * @return clickstream list {@link ClickStream}
+ */
+ protected List<RankingTrainData> extractRankingTrainData(Properties props, ESDriver es) {
+ List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+ LOG.info(logIndexList.toString());
+
+ List<RankingTrainData> result = new ArrayList<>();
+ for (int n = 0; n < logIndexList.size(); n++) {
+ String logIndex = logIndexList.get(n);
+ List<String> sessionIdList;
+ try {
+ sessionIdList = this.getSessions(props, es, logIndex);
+ Session session = new Session(props, es);
+ int sessionNum = sessionIdList.size();
+ for (int i = 0; i < sessionNum; i++) {
+ String[] sArr = sessionIdList.get(i).split(",");
+ List<RankingTrainData> datas = session.getRankingTrainData(sArr[1], sArr[2], sArr[0]);
+ result.addAll(datas);
+ }
+ } catch (Exception e) {
+ LOG.error("Error which extracting ranking train data: {}", e);
+ }
+ }
+
+ return result;
+ }
+
+ protected JavaRDD<RankingTrainData> extractRankingTrainDataInParallel(Properties props, SparkDriver spark, ESDriver es) {
+
+ List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+ LOG.info(logIndexList.toString());
+
+ List<String> sessionIdList = new ArrayList<>();
+ for (int n = 0; n < logIndexList.size(); n++) {
+ String logIndex = logIndexList.get(n);
+ List<String> tmpsessionList = this.getSessions(props, es, logIndex);
+ sessionIdList.addAll(tmpsessionList);
+ }
+
+ JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16);
+
+ JavaRDD<RankingTrainData> clickStreamRDD = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, RankingTrainData>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Iterator<RankingTrainData> call(Iterator<String> arg0) throws Exception {
+ ESDriver tmpES = new ESDriver(props);
+ tmpES.createBulkProcessor();
+
+ Session session = new Session(props, tmpES);
+ List<RankingTrainData> clickstreams = new ArrayList<>();
+ while (arg0.hasNext()) {
+ String s = arg0.next();
+ String[] sArr = s.split(",");
+ List<RankingTrainData> clicks = session.getRankingTrainData(sArr[1], sArr[2], sArr[0]);
+ clickstreams.addAll(clicks);
+ }
+ tmpES.destroyBulkProcessor();
+ tmpES.close();
+ return clickstreams.iterator();
+ }
+ });
+
+ LOG.info("Clickstream number: {}", clickStreamRDD.count());
+
+ return clickStreamRDD;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java
new file mode 100644
index 0000000..958e184
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * ClassName: SessionNode Function: Functions related to a node in a session
+ * tree sturcture.
+ */
+public class SessionNode {
+ // id: Node ID
+ protected String id;
+ // value: Node value
+ protected String value;
+ // parent: Parent node of this node
+ protected SessionNode parent;
+ // children: Child nodes of this node
+ protected List<SessionNode> children = new ArrayList<>();
+ // time: request time of node
+ protected String time;
+ // request: request url of this node
+ protected String request;
+ // referer: previous request url of this node
+ protected String referer;
+ // seq: sequence of this node
+ protected int seq;
+ // key: type of this node extracted from url, including three types -
+ // dataset,datasetlist,ftp
+ protected String key;
+ // logType: log types of this node, including two types - po.dacc, ftp
+ protected String logType;
+ // search: query extracted from this node
+ protected String search;
+ // filter: filter facets extracted from this node
+ protected Map<String, String> filter;
+ // datasetId: viewed/downloaded data set ID
+ protected String datasetId;
+
+ public SessionNode() {
+
+ }
+
+ /**
+ * Creates a new instance of SessionNode.
+ *
+ * @param request: request url
+ * @param logType: including two types - po.dacc, ftp
+ * @param referer: previous request url
+ * @param time: request time of node
+ * @param seq: sequence of this node
+ */
+ public SessionNode(String request, String logType, String referer, String time, int seq) {
+ this.logType = logType;
+ this.time = time;
+ this.seq = seq;
+ this.setRequest(request);
+ this.setReferer(referer);
+ this.setKey(request, logType);
+ }
+
+ /**
+ * setReferer: Set previous request url of this node
+ *
+ * @param referer previous request url
+ */
+ public void setReferer(String referer) {
+ if (referer == null) {
+ this.referer = "";
+ return;
+ }
+ this.referer = referer.toLowerCase().replace("http://podaac.jpl.nasa.gov", "");
+ }
+
+ /**
+ * setRequest: Set request url of this node
+ *
+ * @param req request url
+ */
+ public void setRequest(String req) {
+ this.request = req;
+ if (this.logType.equals("PO.DAAC")) {
+ this.parseRequest(req);
+ }
+ }
+
+ /**
+ * getChildren:Get child nodes of this node
+ *
+ * @return child nodes
+ */
+ public List<SessionNode> getChildren() {
+ return this.children;
+ }
+
+ /**
+ * setChildren: Set child nodes of this node
+ *
+ * @param children child nodes of this node
+ */
+ public void setChildren(List<SessionNode> children) {
+ this.children = children;
+ }
+
+ /**
+ * addChildren: Add a children node
+ *
+ * @param node session node
+ */
+ public void addChildren(SessionNode node) {
+ this.children.add(node);
+ }
+
+ /**
+ * getId:Get node ID
+ *
+ * @return node ID of this node
+ */
+ public String getId() {
+ return this.id;
+ }
+
+ /**
+ * bSame:Compare this node with another node
+ *
+ * @param node {@link SessionNode}
+ * @return boolean value, true mean the two nodes are same
+ */
+ public Boolean bSame(SessionNode node) {
+ Boolean bsame = false;
+ if (this.request.equals(node.request)) {
+ bsame = true;
+ }
+ return bsame;
+ }
+
+ /**
+ * setKey:Set request type which contains three categories -
+ * dataset,datasetlist,ftp
+ *
+ * @param request request url
+ * @param logType url type
+ */
+ public void setKey(String request, String logType) {
+ this.key = "";
+ String datasetlist = "/datasetlist?";
+ String dataset = "/dataset/";
+ if (logType.equals("ftp")) {
+ this.key = "ftp";
+ } else if (logType.equals("root")) {
+ this.key = "root";
+ } else {
+ if (request.contains(datasetlist)) {
+ this.key = "datasetlist";
+ } else if (request.contains(dataset) /* || request.contains(granule) */) {
+ this.key = "dataset";
+ }
+ }
+ }
+
+ /**
+ * getKey:Get request type which contains three categories -
+ * dataset,datasetlist,ftp
+ *
+ * @return request url type of this node
+ */
+ public String getKey() {
+ return this.key;
+ }
+
+ /**
+ * getRequest:Get node request
+ *
+ * @return request url of this node
+ */
+ public String getRequest() {
+ return this.request;
+ }
+
+ /**
+ * getReferer:Get previous request url of this node
+ *
+ * @return previous request url of this node
+ */
+ public String getReferer() {
+ return this.referer;
+ }
+
+ /**
+ * getParent:Get parent node of this node
+ *
+ * @return parent node of this node
+ */
+ public SessionNode getParent() {
+ return this.parent;
+ }
+
+ /**
+ * setParent: Set parent node of this node
+ *
+ * @param parent the previous request node of this node
+ */
+ public void setParent(SessionNode parent) {
+ this.parent = parent;
+ }
+
+ /**
+ * getSearch:Get query of this node
+ *
+ * @return search query of this node
+ */
+ public String getSearch() {
+ return this.search;
+ }
+
+ /**
+ * getFilter:Get filter facets of this node
+ *
+ * @return filter values of this node
+ */
+ public Map<String, String> getFilter() {
+ return this.filter;
+ }
+
+ /**
+ * getDatasetId:Get data set ID of this node
+ *
+ * @return viewing/downloading data set of this node
+ */
+ public String getDatasetId() {
+ return this.datasetId;
+ }
+
+ /**
+ * getSeq:Get sequence of this node
+ *
+ * @return request sequence of this node
+ */
+ public int getSeq() {
+ return this.seq;
+ }
+
+ /**
+ * getFilterStr:Get filter facets of this node
+ *
+ * @return filters values of this node
+ */
+ public String getFilterStr() {
+ String filter = "";
+ if (this.filter.size() > 0) {
+ Iterator iter = this.filter.keySet().iterator();
+ while (iter.hasNext()) {
+ String key = (String) iter.next();
+ String val = this.filter.get(key);
+ filter += key + "=" + val + ",";
+ }
+
+ filter = filter.substring(0, filter.length() - 1);
+ }
+
+ return filter;
+ }
+
+ /**
+ * parseRequest:Parse request to extract request type
+ *
+ * @param request request url of this node
+ */
+ public void parseRequest(String request) {
+ Pattern pattern = Pattern.compile("get (.*?) http/*");
+ Matcher matcher = pattern.matcher(request.trim().toLowerCase());
+ while (matcher.find()) {
+ request = matcher.group(1);
+ }
+ if (request.contains("/dataset/")) {
+ this.parseDatasetId(request);
+ }
+
+ this.request = request.toLowerCase();
+ }
+
+ /**
+ * parseFilterParams:Parse filter facets information
+ *
+ * @param params filter key value pairs of this node
+ */
+ private void parseFilterParams(Map<String, String> params) {
+ this.filter = new HashMap<String, String>();
+ if (params.containsKey("ids")) {
+ String idsStr = params.get("ids");
+ if (!idsStr.equals("")) {
+ idsStr = URLDecoder.decode(idsStr);
+ String[] ids = idsStr.split(":");
+ String valueStr = params.get("values");
+ if (valueStr != null) {
+ valueStr = URLDecoder.decode(valueStr);
+ String[] values = valueStr.split(":");
+ int size = ids.length;
+ for (int i = 0; i < size; i++) {
+ this.filter.put(ids[i], values[i]);
+ }
+ }
+ }
+ }
+
+ if (!this.search.equals("")) {
+ this.filter.put("search", this.search);
+ }
+ }
+
+ /**
+ * parseDatasetId:Parse Request to extract data set ID
+ *
+ * @param request request url
+ */
+ public void parseDatasetId(String request) {
+ try {
+ request = URLDecoder.decode(request, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ String[] twoparts = request.split("[?]");
+ String[] parts = twoparts[0].split("/");
+ if (parts.length <= 2) {
+ return;
+ }
+ this.datasetId = parts[2];
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java
new file mode 100644
index 0000000..46c8d0c
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * ClassName: SessionTree Function: Convert request list in a session to a tree
+ */
+public class SessionTree extends MudrodAbstract {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(SessionTree.class);
+ // size: node numbers in the session tree
+ public int size = 0;
+ // root: root node of session tree
+ protected SessionNode root = null;
+ // binsert: indicates inserting a node or not
+ public boolean binsert = false;
+ // tmpnode: tempt node
+ public SessionNode tmpnode;
+ // latestDatasetnode: the latest inserted node whose key is "dataset"
+ public SessionNode latestDatasetnode;
+ // sessionID: session ID
+ private String sessionID;
+ // cleanupType: session type in Elasticsearch
+ private String cleanupType;
+
+ /**
+ * Creates a new instance of SessionTree.
+ *
+ * @param props: the Mudrod configuration
+ * @param es: the Elasticsearch drive
+ * @param rootData: root node of the tree
+ * @param sessionID: session ID
+ * @param cleanupType: session type
+ */
+ public SessionTree(Properties props, ESDriver es, SessionNode rootData, String sessionID, String cleanupType) {
+ super(props, es, null);
+ root = new SessionNode("root", "root", "", "", 0);
+ tmpnode = root;
+ this.sessionID = sessionID;
+ this.cleanupType = cleanupType;
+ }
+
+ /**
+ * Creates a new instance of SessionTree.
+ *
+ * @param props: the Mudrod configuration
+ * @param es: the Elasticsearch drive
+ * @param sessionID: session ID
+ * @param cleanupType: session type
+ */
+ public SessionTree(Properties props, ESDriver es, String sessionID, String cleanupType) {
+ super(props, es, null);
+ root = new SessionNode("root", "root", "", "", 0);
+ root.setParent(root);
+ tmpnode = root;
+ this.sessionID = sessionID;
+ this.cleanupType = cleanupType;
+ }
+
+ /**
+ * insert: insert a node into the session tree.
+ *
+ * @param node {@link SessionNode}
+ * @return session node
+ */
+ public SessionNode insert(SessionNode node) {
+ // begin with datasetlist
+ if (node.getKey().equals("datasetlist")) {
+ this.binsert = true;
+ }
+ if (!this.binsert) {
+ return null;
+ }
+ // remove unrelated node
+ if (!node.getKey().equals("datasetlist") && !node.getKey().equals("dataset") && !node.getKey().equals("ftp")) {
+ return null;
+ }
+ // remove dumplicated click
+ if (node.getRequest().equals(tmpnode.getRequest())) {
+ return null;
+ }
+ // search insert node
+ SessionNode parentnode = this.searchParentNode(node);
+ if (parentnode == null) {
+ return null;
+ }
+ node.setParent(parentnode);
+ parentnode.addChildren(node);
+
+ // record insert node
+ tmpnode = node;
+ if ("dataset".equals(node.getKey())) {
+ latestDatasetnode = node;
+ }
+
+ size++;
+ return node;
+ }
+
+ /**
+ * printTree: Print session tree
+ *
+ * @param node root node of the session tree
+ */
+ public void printTree(SessionNode node) {
+ LOG.info("node: {} \n", node.getRequest());
+ if (node.children.isEmpty()) {
+ for (int i = 0; i < node.children.size(); i++) {
+ printTree(node.children.get(i));
+ }
+ }
+ }
+
+ /**
+ * TreeToJson: Convert the session tree to Json object
+ *
+ * @param node node of the session tree
+ * @return tree content in Json format
+ */
+ public JsonObject treeToJson(SessionNode node) {
+ Gson gson = new Gson();
+ JsonObject json = new JsonObject();
+
+ json.addProperty("seq", node.getSeq());
+ if ("datasetlist".equals(node.getKey())) {
+ json.addProperty("icon", "./resources/images/searching.png");
+ json.addProperty("name", node.getRequest());
+ } else if ("dataset".equals(node.getKey())) {
+ json.addProperty("icon", "./resources/images/viewing.png");
+ json.addProperty("name", node.getDatasetId());
+ } else if ("ftp".equals(node.getKey())) {
+ json.addProperty("icon", "./resources/images/downloading.png");
+ json.addProperty("name", node.getRequest());
+ } else if ("root".equals(node.getKey())) {
+ json.addProperty("name", "");
+ json.addProperty("icon", "./resources/images/users.png");
+ }
+
+ if (!node.children.isEmpty()) {
+ List<JsonObject> jsonChildren = new ArrayList<>();
+ for (int i = 0; i < node.children.size(); i++) {
+ JsonObject jsonChild = treeToJson(node.children.get(i));
+ jsonChildren.add(jsonChild);
+ }
+ JsonElement jsonElement = gson.toJsonTree(jsonChildren);
+ json.add("children", jsonElement);
+ }
+
+ return json;
+ }
+
+ /**
+ * getClickStreamList: Get click stream list in the session
+ *
+ * @return {@link ClickStream}
+ */
+ public List<ClickStream> getClickStreamList() {
+
+ List<ClickStream> clickthroughs = new ArrayList<>();
+ List<SessionNode> viewnodes = this.getViewNodes(this.root);
+ for (int i = 0; i < viewnodes.size(); i++) {
+
+ SessionNode viewnode = viewnodes.get(i);
+ SessionNode parent = viewnode.getParent();
+ List<SessionNode> children = viewnode.getChildren();
+
+ if (!"datasetlist".equals(parent.getKey())) {
+ continue;
+ }
+
+ RequestUrl requestURL = new RequestUrl();
+ String viewquery = "";
+ try {
+ String infoStr = requestURL.getSearchInfo(viewnode.getRequest());
+ viewquery = es.customAnalyzing(props.getProperty("indexName"), infoStr);
+ } catch (UnsupportedEncodingException | InterruptedException | ExecutionException e) {
+ LOG.warn("Exception getting search info. Ignoring...", e);
+ }
+
+ String dataset = viewnode.getDatasetId();
+ boolean download = false;
+ for (int j = 0; j < children.size(); j++) {
+ SessionNode child = children.get(j);
+ if ("ftp".equals(child.getKey())) {
+ download = true;
+ break;
+ }
+ }
+
+ if (viewquery != null && !"".equals(viewquery)) {
+ String[] queries = viewquery.trim().split(",");
+ if (queries.length > 0) {
+ for (int k = 0; k < queries.length; k++) {
+ ClickStream data = new ClickStream(queries[k], dataset, download);
+ data.setSessionId(this.sessionID);
+ data.setType(this.cleanupType);
+ clickthroughs.add(data);
+ }
+ }
+ }
+ }
+
+ return clickthroughs;
+ }
+
+ /**
+ * searchParentNode:Get parent node of a session node
+ *
+ * @param node {@link SessionNode}
+ * @return node {@link SessionNode}
+ */
+ private SessionNode searchParentNode(SessionNode node) {
+
+ String nodeKey = node.getKey();
+
+ if ("datasetlist".equals(nodeKey)) {
+ if ("-".equals(node.getReferer())) {
+ return root;
+ } else {
+ SessionNode tmp = this.findLatestRefer(tmpnode, node.getReferer());
+ if (tmp == null) {
+ return root;
+ } else {
+ return tmp;
+ }
+ }
+ } else if ("dataset".equals(nodeKey)) {
+ if ("-".equals(node.getReferer())) {
+ return null;
+ } else {
+ return this.findLatestRefer(tmpnode, node.getReferer());
+ }
+ } else if ("ftp".equals(nodeKey)) {
+ return latestDatasetnode;
+ }
+
+ return tmpnode;
+ }
+
+ /**
+ * findLatestRefer: Find parent node whose visiting url is equal to the refer
+ * url of a session node
+ *
+ * @param node: {@link SessionNode}
+ * @param refer: request url
+ * @return
+ */
+ private SessionNode findLatestRefer(SessionNode node, String refer) {
+ while (true) {
+ if ("root".equals(node.getKey())) {
+ return null;
+ }
+ SessionNode parentNode = node.getParent();
+ if (refer.equals(parentNode.getRequest())) {
+ return parentNode;
+ }
+
+ SessionNode tmp = this.iterChild(parentNode, refer);
+ if (tmp == null) {
+ node = parentNode;
+ continue;
+ } else {
+ return tmp;
+ }
+ }
+ }
+
+ /**
+ * iterChild:
+ *
+ * @param start
+ * @param refer
+ * @return
+ */
+ private SessionNode iterChild(SessionNode start, String refer) {
+ List<SessionNode> children = start.getChildren();
+ for (int i = children.size() - 1; i >= 0; i--) {
+ SessionNode tmp = children.get(i);
+ if (tmp.getChildren().isEmpty()) {
+ if (refer.equals(tmp.getRequest())) {
+ return tmp;
+ } else {
+ continue;
+ }
+ } else {
+ iterChild(tmp, refer);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * check:
+ *
+ * @param children
+ * @param str
+ * @return
+ */
+ private boolean check(List<SessionNode> children, String str) {
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).key.equals(str)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * insertHelperChildren:
+ *
+ * @param entry
+ * @param children
+ * @return
+ */
+ private boolean insertHelperChildren(SessionNode entry, List<SessionNode> children) {
+ for (int i = 0; i < children.size(); i++) {
+ boolean result = insertHelper(entry, children.get(i));
+ if (result) {
+ return result;
+ }
+ }
+ return false;
+
+ }
+
+ /**
+ * insertHelper:
+ *
+ * @param entry
+ * @param node
+ * @return
+ */
+ private boolean insertHelper(SessionNode entry, SessionNode node) {
+ if ("datasetlist".equals(entry.key) || "dataset".equals(entry.key)) {
+ if ("datasetlist".equals(node.key)) {
+ if (node.children.isEmpty()) {
+ node.children.add(entry);
+ return true;
+ } else {
+ boolean flag = check(node.children, "datasetlist");
+ if (!flag) {
+ node.children.add(entry);
+ return true;
+ } else {
+ insertHelperChildren(entry, node.children);
+ }
+ }
+ } else {
+ insertHelperChildren(entry, node.children);
+ }
+ } else if ("ftp".equals(entry.key)) {
+ if ("dataset".equals(node.key)) {
+ if (node.children.isEmpty()) {
+ node.children.add(entry);
+ return true;
+ } else {
+ boolean flag = check(node.children, "dataset");
+ if (!flag) {
+ node.children.add(entry);
+ return true;
+ } else {
+ insertHelperChildren(entry, node.children);
+ }
+ }
+ } else {
+ insertHelperChildren(entry, node.children);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * getViewNodes: Get a session node's child nodes whose key is "dataset".
+ *
+ * @param node
+ * @return a list of session node
+ */
+ private List<SessionNode> getViewNodes(SessionNode node) {
+
+ List<SessionNode> viewnodes = new ArrayList<>();
+ if ("dataset".equals(node.getKey())) {
+ viewnodes.add(node);
+ }
+
+ if (!node.children.isEmpty()) {
+ for (int i = 0; i < node.children.size(); i++) {
+ SessionNode childNode = node.children.get(i);
+ viewnodes.addAll(getViewNodes(childNode));
+ }
+ }
+
+ return viewnodes;
+ }
+
+ private List<SessionNode> getQueryNodes(SessionNode node) {
+ return this.getNodes(node, "datasetlist");
+ }
+
+ private List<SessionNode> getNodes(SessionNode node, String nodeKey) {
+
+ List<SessionNode> nodes = new ArrayList<>();
+ if (node.getKey().equals(nodeKey)) {
+ nodes.add(node);
+ }
+
+ if (!node.children.isEmpty()) {
+ for (int i = 0; i < node.children.size(); i++) {
+ SessionNode childNode = node.children.get(i);
+ nodes.addAll(getNodes(childNode, nodeKey));
+ }
+ }
+
+ return nodes;
+ }
+
+ /**
+ * Obtain the ranking training data.
+ *
+ * @param indexName the index from whcih to obtain the data
+ * @param sessionID a valid session identifier
+ * @return {@link ClickStream}
+ * @throws UnsupportedEncodingException if there is an error whilst
+ * processing the ranking training data.
+ */
+ public List<RankingTrainData> getRankingTrainData(String indexName, String sessionID) throws UnsupportedEncodingException {
+
+ List<RankingTrainData> trainDatas = new ArrayList<>();
+
+ List<SessionNode> queryNodes = this.getQueryNodes(this.root);
+ for (int i = 0; i < queryNodes.size(); i++) {
+ SessionNode querynode = queryNodes.get(i);
+ List<SessionNode> children = querynode.getChildren();
+
+ LinkedHashMap<String, Boolean> datasetOpt = new LinkedHashMap<>();
+ int ndownload = 0;
+ for (int j = 0; j < children.size(); j++) {
+ SessionNode node = children.get(j);
+ if ("dataset".equals(node.getKey())) {
+ Boolean bDownload = false;
+ List<SessionNode> nodeChildren = node.getChildren();
+ int childSize = nodeChildren.size();
+ for (int k = 0; k < childSize; k++) {
+ if ("ftp".equals(nodeChildren.get(k).getKey())) {
+ bDownload = true;
+ ndownload += 1;
+ break;
+ }
+ }
+ datasetOpt.put(node.datasetId, bDownload);
+ }
+ }
+
+ // method 1: The priority of download data are higher
+ if (datasetOpt.size() > 1 && ndownload > 0) {
+ // query
+ RequestUrl requestURL = new RequestUrl();
+ String queryUrl = querynode.getRequest();
+ String infoStr = requestURL.getSearchInfo(queryUrl);
+ String query = null;
+ try {
+ query = es.customAnalyzing(props.getProperty("indexName"), infoStr);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Error performing custom analyzing", e);
+ }
+ Map<String, String> filter = RequestUrl.getFilterInfo(queryUrl);
+
+ for (String datasetA : datasetOpt.keySet()) {
+ Boolean bDownloadA = datasetOpt.get(datasetA);
+ if (bDownloadA) {
+ for (String datasetB : datasetOpt.keySet()) {
+ Boolean bDownloadB = datasetOpt.get(datasetB);
+ if (!bDownloadB) {
+
+ String[] queries = query.split(",");
+ for (int l = 0; l < queries.length; l++) {
+ RankingTrainData trainData = new RankingTrainData(queries[l], datasetA, datasetB);
+
+ trainData.setSessionId(this.sessionID);
+ trainData.setIndex(indexName);
+ trainData.setFilter(filter);
+ trainDatas.add(trainData);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return trainDatas;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java
new file mode 100644
index 0000000..e3392e4
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import java.io.Serializable;
+
+/**
+ * This class represents an Apache access log line. See
+ * http://httpd.apache.org/docs/2.2/logs.html for more details.
+ */
+public class WebLog implements Serializable {
+ String LogType;
+ String IP;
+ String Time;
+ String Request;
+ double Bytes;
+
+ public String getLogType() {
+ return this.LogType;
+ }
+
+ public String getIP() {
+ return this.IP;
+ }
+
+ public String getTime() {
+ return this.Time;
+ }
+
+ public String getRequest() {
+ return this.Request;
+ }
+
+ public double getBytes() {
+ return this.Bytes;
+ }
+
+ public WebLog() {
+
+ }
+
+ public static String SwithtoNum(String time) {
+ if (time.contains("Jan")) {
+ time = time.replace("Jan", "1");
+ } else if (time.contains("Feb")) {
+ time = time.replace("Feb", "2");
+ } else if (time.contains("Mar")) {
+ time = time.replace("Mar", "3");
+ } else if (time.contains("Apr")) {
+ time = time.replace("Apr", "4");
+ } else if (time.contains("May")) {
+ time = time.replace("May", "5");
+ } else if (time.contains("Jun")) {
+ time = time.replace("Jun", "6");
+ } else if (time.contains("Jul")) {
+ time = time.replace("Jul", "7");
+ } else if (time.contains("Aug")) {
+ time = time.replace("Aug", "8");
+ } else if (time.contains("Sep")) {
+ time = time.replace("Sep", "9");
+ } else if (time.contains("Oct")) {
+ time = time.replace("Oct", "10");
+ } else if (time.contains("Nov")) {
+ time = time.replace("Nov", "11");
+ } else if (time.contains("Dec")) {
+ time = time.replace("Dec", "12");
+ }
+ return time;
+ }
+
+ public static boolean checknull(String s) {
+ if (s.equals("{}")) {
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java
new file mode 100644
index 0000000..7aa9898
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes data structure needed for web log analysis
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/config.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/config.xml b/core/src/main/resources/config.xml
new file mode 100644
index 0000000..62ac7a6
--- /dev/null
+++ b/core/src/main/resources/config.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0 Unless
+
+ required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS"
+ BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ express or implied. See the License for the specific language
+ governing permissions and limitations under the License.
+-->
+<Config>
+ <para name="Cleanup_type_prefix">cleanupLog</para>
+
+ <para name="clickStreamLinkageType">ClickStreamLinkage</para>
+
+ <para name="clickStreamMatrixType">clickstreamMatrix</para>
+
+ <para name="clickstreamSVDDimension">50</para>
+
+ <para name="clickStream_w">2</para>
+
+ <para name="commentType">comment</para>
+
+ <para name="downloadf">100</para>
+
+ <para name="downloadWeight">3</para>
+
+ <para name="clusterName">MudrodES</para>
+
+ <para name="ES_Transport_TCP_Port">9300</para>
+
+ <para name="ES_unicast_hosts">127.0.0.1</para>
+
+ <para name="ES_HTTP_port">9200</para>
+
+ <para name="indexName">mudrod</para>
+
+ <para name="ftpPrefix">FTP.</para>
+
+ <para name="FTP_type_prefix">rawftp</para>
+
+ <para name="HTTP_type_prefix">rawhttp</para>
+
+ <para name="httpPrefix">WWW.</para>
+
+ <para name="logIndexName">podaaclog</para>
+
+ <para name="metadataLinkageType">MetadataLinkage</para>
+
+ <para name="metadataSVDDimension">50</para>
+
+ <para name="metadataurl">null</para>
+
+ <para name="metadata_w">1</para>
+
+ <para name="mini_userHistory">5</para>
+
+ <!--
+ The ontology service implementation. Possible values include
+ EsipPortal - EsipPortalOntology
+ EsipCOR - EsipCOROntology
+ Local - gov.nasa.jpl.mudrod.ontology.process.Local
+ -->
+ <para name="mudrod.ontology.implementation">Local</para>
+
+ <para name="ontologyLinkageType">SWEETLinkage</para>
+
+ <para name="ontology_w">2</para>
+
+ <!--
+ Log processing type. Possible values include
+ 'sequential' or 'parallel'.
+ -->
+ <para name="processingType">parallel</para>
+
+ <para name="raw_metadataType">RawMetadata</para>
+
+ <para name="searchf">100</para>
+
+ <para name="sendingrate">30</para>
+
+ <para name="SessionPort">8080</para>
+
+ <para name="SessionStats_prefix">sessionstats</para>
+
+ <para name="SessionUrl">/mudrod-service/session.html</para>
+
+ <!-- The name of your application. This will appear in the UI and in log data.-->
+ <para name="spark.app.name">MudrodSparkApp</para>
+
+ <!--
+ The default Spark cluster manager to connect to. See the list of allowed master URL's.
+ For more information, consult http://spark.apache.org/docs/latest/submitting-applications.html#master-urls
+ -->
+ <para name="spark.master">local[4]</para>
+
+ <!-- ${svmSgdModel.value} is resolved at build time. See the property in core/pom.xml for the value -->
+ <para name="svmSgdModel">${svmSgdModel.value}.zip</para>
+
+ <para name="timegap">600</para>
+
+ <para name="userHistoryLinkageType">UserHistoryLinkage</para>
+
+ <para name="userHistory_w">2</para>
+
+ <para name="viewf">200</para>
+
+
+
+ <!-- FOLLOWING NEEDS TO BE ADDED TO MudrodConstants.java -->
+ <para name="recom_metadataType">RecomMetadata</para>
+ <!-- recommendation -->
+ <para name="metadataTermTFIDFSimType">MetadataTermTFIDFSim</para>
+ <para name="metadataWordTFIDFSimType">MetadataWordTFIDFSim</para>
+ <para name="metadataCodeSimType">MetadataCodeSim</para>
+ <para name="metadataSessionBasedSimType">MetadataSBSim</para>
+ <para name="metadataTopicSimType">MetadataTBSim</para>
+ <!--
+ Log processing parallel optimization type. Possible values include
+ default - MudrodConstants.PARALLEL_OPTIMIZATION_DEFAULT
+ repartition - MudrodConstants.PARALLEL_OPTIMIZATION_REPARTITION
+ -->
+ <para name="parallelOptimization">repartition</para>
+
+</Config>
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/elastic_mappings.json
----------------------------------------------------------------------
diff --git a/core/src/main/resources/elastic_mappings.json b/core/src/main/resources/elastic_mappings.json
new file mode 100644
index 0000000..685f49e
--- /dev/null
+++ b/core/src/main/resources/elastic_mappings.json
@@ -0,0 +1,68 @@
+{
+ "_default_": {
+ "properties": {
+ "keywords": {
+ "type": "text",
+ "analyzer": "csv",
+ "fielddata": true
+ },
+ "views": {
+ "type": "string",
+ "analyzer": "csv"
+ },
+ "downloads": {
+ "type": "string",
+ "analyzer": "csv"
+ },
+ "RequestUrl": {
+ "type": "string",
+ "include_in_all": false,
+ "index": "no"
+ },
+ "IP": {
+ "type": "keyword",
+ "index": "not_analyzed"
+ },
+ "Browser": {
+ "type": "string",
+ "include_in_all": false,
+ "index": "no"
+ },
+ "SessionURL": {
+ "type": "string",
+ "include_in_all": false,
+ "index": "no"
+ },
+ "Referer": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "SessionID": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "Response": {
+ "type": "string",
+ "include_in_all": false,
+ "index": "no"
+ },
+ "Request": {
+ "type": "string",
+ "include_in_all": false,
+ "index": "no"
+ },
+ "Coordinates": {
+ "type": "geo_point",
+ "include_in_all": false,
+ "index": "no"
+ },
+ "LogType": {
+ "type": "string",
+ "index": "not_analyzed"
+ },
+ "Dataset-Metadata": {
+ "type": "completion"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/elastic_settings.json
----------------------------------------------------------------------
diff --git a/core/src/main/resources/elastic_settings.json b/core/src/main/resources/elastic_settings.json
new file mode 100644
index 0000000..f5faa3e
--- /dev/null
+++ b/core/src/main/resources/elastic_settings.json
@@ -0,0 +1,36 @@
+{
+ "index": {
+ "number_of_replicas": 0,
+ "refresh_interval": "-1",
+ "number_of_shards": "5",
+ "translog.flush_threshold_size": "1g",
+ "translog.sync_interval": "30s",
+ "warmer.enabled": "false"
+ },
+ "analysis": {
+ "filter": {
+ "cody_stop": {
+ "type": "stop",
+ "stopwords": "_english_"
+ },
+ "cody_stemmer": {
+ "type": "stemmer",
+ "language": "light_english"
+ }
+ },
+ "analyzer": {
+ "cody": {
+ "tokenizer": "standard",
+ "filter": [
+ "lowercase",
+ "cody_stop",
+ "cody_stemmer"
+ ]
+ },
+ "csv": {
+ "type": "pattern",
+ "pattern": ","
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/data/_SUCCESS
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/data/_SUCCESS b/core/src/main/resources/javaSVMWithSGDModel/data/_SUCCESS
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/data/_common_metadata
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/data/_common_metadata b/core/src/main/resources/javaSVMWithSGDModel/data/_common_metadata
new file mode 100644
index 0000000..cafbf1b
Binary files /dev/null and b/core/src/main/resources/javaSVMWithSGDModel/data/_common_metadata differ
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/data/_metadata
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/data/_metadata b/core/src/main/resources/javaSVMWithSGDModel/data/_metadata
new file mode 100644
index 0000000..6bfec98
Binary files /dev/null and b/core/src/main/resources/javaSVMWithSGDModel/data/_metadata differ
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/data/part-r-00000-e008ae03-6b61-4931-ba29-27304de5a584.gz.parquet
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/data/part-r-00000-e008ae03-6b61-4931-ba29-27304de5a584.gz.parquet b/core/src/main/resources/javaSVMWithSGDModel/data/part-r-00000-e008ae03-6b61-4931-ba29-27304de5a584.gz.parquet
new file mode 100644
index 0000000..5033301
Binary files /dev/null and b/core/src/main/resources/javaSVMWithSGDModel/data/part-r-00000-e008ae03-6b61-4931-ba29-27304de5a584.gz.parquet differ
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/metadata/_SUCCESS
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/metadata/_SUCCESS b/core/src/main/resources/javaSVMWithSGDModel/metadata/_SUCCESS
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/metadata/part-00000
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/metadata/part-00000 b/core/src/main/resources/javaSVMWithSGDModel/metadata/part-00000
new file mode 100644
index 0000000..c972cbe
--- /dev/null
+++ b/core/src/main/resources/javaSVMWithSGDModel/metadata/part-00000
@@ -0,0 +1 @@
+{"class":"org.apache.spark.mllib.classification.SVMModel","version":"1.0","numFeatures":6,"numClasses":2}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/log4j.properties b/core/src/main/resources/log4j.properties
new file mode 100644
index 0000000..1e6e84d
--- /dev/null
+++ b/core/src/main/resources/log4j.properties
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0 Unless
+#
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS"
+# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+# express or implied. See the License for the specific language
+# governing permissions and limitations under the License.
+# Define some default values that can be overridden by system properties
+# Logging Threshold
+mudrod.root.logger=INFO,DRFA, stdout
+mudrod.log.dir=.
+mudrod.log.file=mudrod.log
+log4j.threshhold=ALL
+# RootLogger - DailyRollingFileAppender
+log4j.rootLogger=${mudrod.root.logger}
+#special logging requirements for some commandline tools
+log4j.logger.MudrodEngine=INFO,cmdstdout
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${mudrod.log.dir}/${mudrod.log.file}
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+# Debugging Pattern format: Date LogLevel LoggerName (FileName:MethodName:LineNo) LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# stdout
+# Add *stdout* to rootlogger above if you want to use this
+#
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# plain layout used for commandline tools to output to console
+#
+log4j.appender.cmdstdout=org.apache.log4j.ConsoleAppender
+log4j.appender.cmdstdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.cmdstdout.layout.ConversionPattern=%m%n
+#
+# Rolling File Appender
+#
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+# Custom Logging levels
+log4j.logger.akka=WARN
+log4j.logger.org.apache=WARN
+log4j.logger.gov.nasa.jpl.mudrod=INFO
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/log4j2.properties b/core/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..1e6e84d
--- /dev/null
+++ b/core/src/main/resources/log4j2.properties
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0 Unless
+#
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS"
+# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+# express or implied. See the License for the specific language
+# governing permissions and limitations under the License.
+# Define some default values that can be overridden by system properties
+# Logging Threshold
+mudrod.root.logger=INFO,DRFA, stdout
+mudrod.log.dir=.
+mudrod.log.file=mudrod.log
+log4j.threshhold=ALL
+# RootLogger - DailyRollingFileAppender
+log4j.rootLogger=${mudrod.root.logger}
+#special logging requirements for some commandline tools
+log4j.logger.MudrodEngine=INFO,cmdstdout
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${mudrod.log.dir}/${mudrod.log.file}
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+# Debugging Pattern format: Date LogLevel LoggerName (FileName:MethodName:LineNo) LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# stdout
+# Add *stdout* to rootlogger above if you want to use this
+#
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# plain layout used for commandline tools to output to console
+#
+log4j.appender.cmdstdout=org.apache.log4j.ConsoleAppender
+log4j.appender.cmdstdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.cmdstdout.layout.ConversionPattern=%m%n
+#
+# Rolling File Appender
+#
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+# Custom Logging levels
+log4j.logger.akka=WARN
+log4j.logger.org.apache=WARN
+log4j.logger.gov.nasa.jpl.mudrod=INFO