You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:34:55 UTC

[14/21] incubator-sdap-mudrod git commit: SDAP-1 Import all code under the SDAP SGA

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java
new file mode 100644
index 0000000..bbfb79c
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/RequestUrl.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ClassName: RequestUrl Function: request url relate operations
+ */
+public class RequestUrl {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RequestUrl.class);
+
+  /**
+   * Default Constructor
+   */
+  public RequestUrl() {
+    /* Default Constructor */
+  }
+
+  /**
+   * UrlPage: Get url page from url link
+   *
+   * @param strURL request url
+   * @return page name
+   */
+  public static String urlPage(String strURL) {
+    String strPage = null;
+    String[] arrSplit = null;
+
+    String newURL = strURL.trim().toLowerCase();
+
+    arrSplit = newURL.split("[?]");
+    if (newURL.length() > 0 && arrSplit.length > 1 && arrSplit[0] != null) {
+      strPage = arrSplit[0];
+    }
+
+    return strPage;
+  }
+
+  /**
+   * TruncateUrlPage: Get url params from url link
+   *
+   * @param strURL
+   * @return url params
+   */
+  private static String truncateUrlPage(String strURL) {
+    String strAllParam = null;
+    String[] arrSplit = null;
+
+    strURL = strURL.trim().toLowerCase(); // keep this in mind
+
+    arrSplit = strURL.split("[?]");
+    if (strURL.length() > 1) {
+      if (arrSplit.length > 1) {
+        if (arrSplit[1] != null) {
+          strAllParam = arrSplit[1];
+        }
+      }
+    }
+
+    return strAllParam;
+  }
+
+  /**
+   * URLRequest: Get url params from url link in a map format
+   *
+   * @param URL request url
+   * @return url params key value map
+   */
+  public static Map<String, String> uRLRequest(String URL) {
+    Map<String, String> mapRequest = new HashMap<String, String>();
+
+    String[] arrSplit = null;
+
+    String strUrlParam = truncateUrlPage(URL);
+    if (strUrlParam == null) {
+      return mapRequest;
+    }
+
+    arrSplit = strUrlParam.split("[&]");
+    for (String strSplit : arrSplit) {
+      String[] arrSplitEqual = null;
+      arrSplitEqual = strSplit.split("[=]");
+
+      if (arrSplitEqual.length > 1) {
+
+        mapRequest.put(arrSplitEqual[0], arrSplitEqual[1]);
+
+      } else {
+        if (arrSplitEqual[0] != "") {
+
+          mapRequest.put(arrSplitEqual[0], "");
+        }
+      }
+    }
+    return mapRequest;
+  }
+
+  /**
+   * GetSearchInfo: Get search information from url link
+   *
+   * @param URL request url
+   * @return search params
+   * @throws UnsupportedEncodingException UnsupportedEncodingException
+   */
+  public String getSearchInfo(String URL) throws UnsupportedEncodingException {
+    List<String> info = new ArrayList<String>();
+    String keyword = "";
+    Map<String, String> mapRequest = RequestUrl.uRLRequest(URL);
+    if (mapRequest.get("search") != null) {
+      try {
+        keyword = mapRequest.get("search");
+
+        keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
+        if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) {
+          keyword = keyword.replace("%2b", " ");
+          keyword = keyword.replace("%20", " ");
+          keyword = keyword.replace("%25", " ");
+        }
+
+        keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+
+      } catch (UnsupportedEncodingException e) {
+        LOG.error(mapRequest.get("search"));
+        e.printStackTrace();
+      }
+      if (!"".equals(keyword)) {
+        info.add(keyword);
+      }
+
+    }
+
+    if (mapRequest.get("ids") != null && mapRequest.get("values") != null) {
+      String id_raw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8");
+      String value_raw = URLDecoder.decode(mapRequest.get("values"), "UTF-8");
+      String[] ids = id_raw.split(":");
+      String[] values = value_raw.split(":");
+
+      int a = ids.length;
+      int b = values.length;
+      int l = a < b ? a : b;
+
+      for (int i = 0; i < l; i++) {
+        if (ids[i].equals("collections") || ids[i].equals("measurement") || ids[i].equals("sensor") || ids[i].equals("platform") || ids[i].equals("variable") || ids[i].equals("spatialcoverage")) {
+          try {
+            values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25");
+            if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && !URLDecoder.decode(values[i], "UTF-8").equals("")) {
+              String item = URLDecoder.decode(values[i], "UTF-8").trim();
+              if (item.contains("%2b") || item.contains("%20") || item.contains("%25")) {
+                item = item.replace("%2b", " ");
+                item = item.replace("%20", " ");
+                item = item.replace("%25", " ");
+              }
+              item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+              info.add(item);
+            }
+          } catch (Exception e) {
+            LOG.error(values[i]);
+            e.printStackTrace();
+          }
+        }
+
+      }
+    }
+
+    return String.join(",", info);
+  }
+
+  /**
+   * GetSearchWord: Get search words from url link
+   *
+   * @param url request url
+   * @return query
+   */
+  public static String getSearchWord(String url) {
+    String keyword = "";
+
+    Map<String, String> mapRequest = RequestUrl.uRLRequest(url);
+    if (mapRequest.get("search") != null) {
+      try {
+        keyword = mapRequest.get("search");
+
+        keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
+        if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) {
+          keyword = keyword.replace("%2b", " ");
+          keyword = keyword.replace("%20", " ");
+          keyword = keyword.replace("%25", " ");
+        }
+        keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+      } catch (UnsupportedEncodingException e) {
+        LOG.error(mapRequest.get("search"));
+        e.printStackTrace();
+      }
+    }
+
+    return keyword;
+  }
+
+  /**
+   * GetFilterInfo: Get filter params from url link
+   *
+   * @param url request url
+   * @return filter facet key pair map
+   * @throws UnsupportedEncodingException UnsupportedEncodingException
+   */
+  public static Map<String, String> getFilterInfo(String url) throws UnsupportedEncodingException {
+    List<String> info = new ArrayList<>();
+    Map<String, String> filterValues = new HashMap<>();
+
+    String keyword = "";
+    Map<String, String> mapRequest = RequestUrl.uRLRequest(url);
+    if (mapRequest.get("search") != null) {
+      try {
+        keyword = mapRequest.get("search");
+
+        keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
+        if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) {
+          keyword = keyword.replace("%2b", " ");
+          keyword = keyword.replace("%20", " ");
+          keyword = keyword.replace("%25", " ");
+        }
+        keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+
+      } catch (UnsupportedEncodingException e) {
+        LOG.error(mapRequest.get("search"));
+        e.printStackTrace();
+      }
+      if (!"".equals(keyword)) {
+        info.add(keyword);
+      }
+
+    }
+
+    if (mapRequest.get("ids") != null && mapRequest.get("values") != null) {
+      String idRaw = URLDecoder.decode(mapRequest.get("ids"), "UTF-8");
+      String valueRaw = URLDecoder.decode(mapRequest.get("values"), "UTF-8");
+      String[] ids = idRaw.split(":");
+      String[] values = valueRaw.split(":");
+
+      int a = ids.length;
+      int b = values.length;
+      int l = a < b ? a : b;
+
+      for (int i = 0; i < l; i++) {
+        try {
+          values[i] = values[i].replaceAll("%(?![0-9a-fA-F]{2})", "%25");
+          if (!URLDecoder.decode(values[i], "UTF-8").equals(keyword) && !URLDecoder.decode(values[i], "UTF-8").equals("")) {
+            String item = URLDecoder.decode(values[i], "UTF-8").trim();
+            if (item.contains("%2b") || item.contains("%20") || item.contains("%25")) {
+              item = item.replace("%2b", " ");
+              item = item.replace("%20", " ");
+              item = item.replace("%25", " ");
+            }
+            item = item.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
+            filterValues.put(ids[i], item);
+          }
+        } catch (Exception e) {
+          LOG.error(values[i]);
+          e.printStackTrace();
+        }
+      }
+    }
+
+    if (mapRequest.get("temporalsearch") != null) {
+      String temporalsearch = mapRequest.get("temporalsearch");
+      temporalsearch = URLDecoder.decode(temporalsearch.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
+
+      filterValues.put("temporalsearch", temporalsearch);
+    }
+
+    return filterValues;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java
new file mode 100644
index 0000000..93f4288
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/Session.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortOrder;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * ClassName: Session Function: Session operations.
+ */
+public class Session /*extends MudrodAbstract*/ implements Comparable<Session> {
+  private static final Logger LOG = LoggerFactory.getLogger(Session.class);
+  // start: start time of session
+  private String start;
+  // end: end time of session
+  private String end;
+  // id: original session ID
+  private String id;
+  // newid: new session ID
+  private String newid = null;
+  // fmt: time formatter
+  private DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
+
+  private ESDriver es;
+  private Properties props;
+
+  /**
+   * Creates a new instance of Session.
+   *
+   * @param props the Mudrod configuration
+   * @param es    the Elasticsearch drive
+   * @param start start time of session
+   * @param end   end time of session
+   * @param id    session ID
+   */
+  public Session(Properties props, ESDriver es, String start, String end, String id) {
+    this.start = start;
+    this.end = end;
+    this.id = id;
+
+    this.props = props;
+    this.es = es;
+  }
+
+  /**
+   * Creates a new instance of Session.
+   *
+   * @param props the Mudrod configuration
+   * @param es    the Elasticsearch drive
+   */
+  public Session(Properties props, ESDriver es) {
+    this.props = props;
+    this.es = es;
+  }
+
+  /**
+   * getID: Get original session ID
+   *
+   * @return session id
+   */
+  public String getID() {
+    return id;
+  }
+
+  /**
+   * getNewID: Get new session ID
+   *
+   * @return new session id
+   */
+  public String getNewID() {
+    return newid;
+  }
+
+  /**
+   * setNewID: Set new session ID
+   *
+   * @param str: session ID
+   * @return new session id
+   */
+  public String setNewID(String str) {
+    return newid = str;
+  }
+
+  /**
+   * getStartTime:Get start time of current session
+   *
+   * @return start time of session
+   */
+  public String getStartTime() {
+    return start;
+  }
+
+  /**
+   * getEndTime:Get end time of current session
+   *
+   * @return end time of session
+   */
+  public String getEndTime() {
+    return end;
+  }
+
+  /**
+   * Compare current session with another session
+   *
+   * @see java.lang.Comparable#compareTo(java.lang.Object)
+   */
+  @Override
+  public int compareTo(Session o) {
+    fmt.parseDateTime(this.end);
+    fmt.parseDateTime(o.end);
+    // ascending order
+    return Seconds.secondsBetween(fmt.parseDateTime(o.end), fmt.parseDateTime(this.end)).getSeconds();
+
+  }
+
+  /**
+   * getSessionDetail:Get detail of current session, which is used for session
+   * tree reconstruct
+   *
+   * @param indexName    name of index from which you wish to obtain session detail.
+   * @param type: Session type name in Elasticsearch
+   * @param sessionID:   Session ID
+   * @return Session details in Json format
+   */
+  public JsonObject getSessionDetail(String indexName, String type, String sessionID) {
+    JsonObject sessionResults = new JsonObject();
+    // for session tree
+    SessionTree tree = null;
+    JsonElement jsonRequest = null;
+    try {
+      tree = this.getSessionTree(indexName, type, sessionID);
+      JsonObject jsonTree = tree.treeToJson(tree.root);
+      sessionResults.add("treeData", jsonTree);
+
+      jsonRequest = this.getRequests(type, sessionID);
+      sessionResults.add("RequestList", jsonRequest);
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Encoding error detected.", e);
+
+    }
+
+    return sessionResults;
+  }
+
+  /**
+   * getClickStreamList: Extracted click stream list from current session.
+   *
+   * @param indexName    an index from which to query for a session list
+   * @param type: Session type name in Elasticsearch
+   * @param sessionID:   Session ID
+   * @return Click stram data list
+   * {@link ClickStream}
+   */
+  public List<ClickStream> getClickStreamList(String indexName, String type, String sessionID) {
+    SessionTree tree = null;
+    try {
+      tree = this.getSessionTree(indexName, type, sessionID);
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Erro whilst obtaining the Session Tree: {}", e);
+    }
+
+    List<ClickStream> clickthroughs = tree.getClickStreamList();
+    return clickthroughs;
+  }
+
+  /**
+   * Method of converting a given session to a tree structure
+   *
+   * @param type session type name in Elasticsearch
+   * @param sessionID   ID of session
+   * @return an instance of session tree structure
+   * @throws UnsupportedEncodingException UnsupportedEncodingException
+   */
+  private SessionTree getSessionTree(String indexName, String type, String sessionID) throws UnsupportedEncodingException {
+
+    SearchResponse response = es.getClient().prepareSearch(indexName).setTypes(type).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100).addSort("Time", SortOrder.ASC)
+        .execute().actionGet();
+
+    SessionTree tree = new SessionTree(this.props, this.es, sessionID, type);
+    int seq = 1;
+    for (SearchHit hit : response.getHits().getHits()) {
+      Map<String, Object> result = hit.getSource();
+      String request = (String) result.get("Request");
+      String time = (String) result.get("Time");
+      String logType = (String) result.get("LogType");
+      String referer = (String) result.get("Referer");
+
+      SessionNode node = new SessionNode(request, logType, referer, time, seq);
+      tree.insert(node);
+      seq++;
+    }
+
+    return tree;
+  }
+
+  /**
+   * Method of getting all requests from a given current session
+   *
+   * @param cleanuptype Session type name in Elasticsearch
+   * @param sessionID   Session ID
+   * @return all of these requests in JSON
+   * @throws UnsupportedEncodingException UnsupportedEncodingException
+   */
+  private JsonElement getRequests(String cleanuptype, String sessionID) throws UnsupportedEncodingException {
+    SearchResponse response = es.getClient().prepareSearch(props.getProperty("indexName")).setTypes(cleanuptype).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100)
+        .addSort("Time", SortOrder.ASC).execute().actionGet();
+
+    Gson gson = new Gson();
+    List<JsonObject> requestList = new ArrayList<>();
+    int seq = 1;
+    for (SearchHit hit : response.getHits().getHits()) {
+      Map<String, Object> result = hit.getSource();
+      String request = (String) result.get("Request");
+      String requestUrl = (String) result.get("RequestUrl");
+      String time = (String) result.get("Time");
+      String logType = (String) result.get("LogType");
+      String referer = (String) result.get("Referer");
+
+      JsonObject req = new JsonObject();
+      req.addProperty("Time", time);
+      req.addProperty("Request", request);
+      req.addProperty("RequestURL", requestUrl);
+      req.addProperty("LogType", logType);
+      req.addProperty("Referer", referer);
+      req.addProperty("Seq", seq);
+      requestList.add(req);
+
+      seq++;
+    }
+    return gson.toJsonTree(requestList);
+  }
+
+  /**
+   * getClickStreamList: Extracted ranking training data from current session.
+   *
+   * @param indexName    an index from which to obtain ranked training data.
+   * @param cleanuptype: Session type name in Elasticsearch
+   * @param sessionID:   Session ID
+   * @return Click stram data list
+   * {@link ClickStream}
+   */
+  public List<RankingTrainData> getRankingTrainData(String indexName, String cleanuptype, String sessionID) {
+    SessionTree tree = null;
+    try {
+      tree = this.getSessionTree(indexName, cleanuptype, sessionID);
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Error whilst retreiving Session Tree: {}", e);
+    }
+
+    List<RankingTrainData> trainData = new ArrayList<>();
+    try {
+      trainData = tree.getRankingTrainData(indexName, sessionID);
+    } catch (UnsupportedEncodingException e) {
+      LOG.error("Error whilst retreiving ranking training data: {}", e);
+    }
+
+    return trainData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java
new file mode 100644
index 0000000..edba32e
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionExtractor.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * ClassName: SessionExtractor Function: Extract sessions details from
+ * reconstructed sessions.
+ */
+public class SessionExtractor implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SessionExtractor.class);
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+
+  public SessionExtractor() {
+    // default constructor
+  }
+
+  /**
+   * extractClickStreamFromES:Extract click streams from logs stored in
+   * Elasticsearch
+   *
+   * @param props
+   *          the Mudrod configuration
+   * @param es
+   *          the Elasticsearch drive
+   * @param spark
+   *          the spark driver
+   * @return clickstream list in JavaRDD format {@link ClickStream}
+   */
+  public JavaRDD<ClickStream> extractClickStreamFromES(Properties props, ESDriver es, SparkDriver spark) {
+    switch (props.getProperty(MudrodConstants.PROCESS_TYPE)) {
+      case "sequential":
+        List<ClickStream> queryList = this.getClickStreamList(props, es);
+        return spark.sc.parallelize(queryList);
+      case "parallel":
+        return getClickStreamListInParallel(props, spark, es);
+      default:
+      LOG.error("Error finding processing type for '{}'. Please check your config.xml.", props.getProperty(MudrodConstants.PROCESS_TYPE));
+    }
+    return null;
+  }
+
+  /**
+   * getClickStreamList:Extract click streams from logs stored in Elasticsearch.
+   *
+   * @param props
+   *          the Mudrod configuration
+   * @param es
+   *          the Elasticsearch driver
+   * @return clickstream list {@link ClickStream}
+   */
+  protected List<ClickStream> getClickStreamList(Properties props, ESDriver es) {
+    List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+    List<ClickStream> result = new ArrayList<>();
+    for (int n = 0; n < logIndexList.size(); n++) {
+      String logIndex = logIndexList.get(n);
+      List<String> sessionIdList;
+      try {
+        sessionIdList = this.getSessions(props, es, logIndex);
+        Session session = new Session(props, es);
+        int sessionNum = sessionIdList.size();
+        for (int i = 0; i < sessionNum; i++) {
+          String[] sArr = sessionIdList.get(i).split(",");
+          List<ClickStream> datas = session.getClickStreamList(sArr[1], sArr[2], sArr[0]);
+          result.addAll(datas);
+        }
+      } catch (Exception e) {
+        LOG.error("Error during extraction of Clickstreams from log index. {}", e);
+      }
+    }
+
+    return result;
+  }
+
+  protected JavaRDD<ClickStream> getClickStreamListInParallel(Properties props, SparkDriver spark, ESDriver es) {
+
+    List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+    LOG.info("Retrieved {}", logIndexList.toString());
+
+    List<String> sessionIdList = new ArrayList<>();
+    for (int n = 0; n < logIndexList.size(); n++) {
+      String logIndex = logIndexList.get(n);
+      List<String> tmpsessionList = this.getSessions(props, es, logIndex);
+      sessionIdList.addAll(tmpsessionList);
+    }
+
+    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16);
+
+    JavaRDD<ClickStream> clickStreamRDD = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, ClickStream>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Iterator<ClickStream> call(Iterator<String> arg0) throws Exception {
+        ESDriver tmpES = new ESDriver(props);
+        tmpES.createBulkProcessor();
+
+        Session session = new Session(props, tmpES);
+        List<ClickStream> clickstreams = new ArrayList<>();
+        while (arg0.hasNext()) {
+          String s = arg0.next();
+          String[] sArr = s.split(",");
+          List<ClickStream> clicks = session.getClickStreamList(sArr[1], sArr[2], sArr[0]);
+          clickstreams.addAll(clicks);
+        }
+        tmpES.destroyBulkProcessor();
+        tmpES.close();
+        return clickstreams.iterator();
+      }
+    });
+
+    LOG.info("Clickstream number: {}", clickStreamRDD.count());
+
+    return clickStreamRDD;
+  }
+
+  // This function is reserved and not being used for now
+
+  /**
+   * loadClickStremFromTxt:Load click stream form txt file
+   *
+   * @param clickthroughFile
+   *          txt file
+   * @param sc
+   *          the spark context
+   * @return clickstream list in JavaRDD format {@link ClickStream}
+   */
+  public JavaRDD<ClickStream> loadClickStremFromTxt(String clickthroughFile, JavaSparkContext sc) {
+    return sc.textFile(clickthroughFile).flatMap(new FlatMapFunction<String, ClickStream>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Iterator<ClickStream> call(String line) throws Exception {
+        List<ClickStream> clickthroughs = (List<ClickStream>) ClickStream.parseFromTextLine(line);
+        return (Iterator<ClickStream>) clickthroughs;
+      }
+    });
+  }
+
+  /**
+   * bulidDataQueryRDD: convert click stream list to data set queries pairs.
+   *
+   * @param clickstreamRDD:
+   *          click stream data
+   * @param downloadWeight:
+   *          weight of download behavior
+   * @return JavaPairRDD, key is short name of data set, and values are queries
+   */
+  public JavaPairRDD<String, List<String>> bulidDataQueryRDD(JavaRDD<ClickStream> clickstreamRDD, int downloadWeight) {
+    return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, List<String>>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, List<String>> call(ClickStream click) throws Exception {
+        List<String> query = new ArrayList<>();
+        // important! download behavior is given higher weights
+        // than viewing
+        // behavior
+        boolean download = click.isDownload();
+        int weight = 1;
+        if (download) {
+          weight = downloadWeight;
+        }
+        for (int i = 0; i < weight; i++) {
+          query.add(click.getKeyWords());
+        }
+
+        return new Tuple2<>(click.getViewDataset(), query);
+      }
+    }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public List<String> call(List<String> v1, List<String> v2) throws Exception {
+        List<String> list = new ArrayList<>();
+        list.addAll(v1);
+        list.addAll(v2);
+        return list;
+      }
+    });
+  }
+
+  /**
+   * getSessions: Get sessions from logs
+   *
+   * @param props
+   *          the Mudrod configuration
+   * @param es
+   *          the Elasticsearch driver
+   * @param logIndex
+   *          a log index name
+   * @return list of session names
+   */
+  protected List<String> getSessions(Properties props, ESDriver es, String logIndex) {
+
+    String cleanupPrefix = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX);
+    String sessionStatPrefix = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX);
+
+    List<String> sessions = new ArrayList<>();
+    SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(sessionStatPrefix).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+            .actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> session = hit.getSource();
+        String sessionID = (String) session.get("SessionID");
+        sessions.add(sessionID + "," + logIndex + "," + cleanupPrefix);
+      }
+
+      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    return sessions;
+  }
+
+  public JavaPairRDD<String, Double> bulidUserItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
+    return clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, Double>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, Double> call(ClickStream click) throws Exception {
+        double rate = 1;
+        boolean download = click.isDownload();
+        if (download) {
+          rate = 2;
+        }
+
+        String sessionID = click.getSessionID();
+        String user = sessionID.split("@")[0];
+
+        return new Tuple2<>(user + "," + click.getViewDataset(), rate);
+      }
+    }).reduceByKey(new Function2<Double, Double, Double>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Double call(Double v1, Double v2) throws Exception {
+        return v1 >= v2 ? v1 : v2;
+
+      }
+    });
+  }
+
+  public JavaPairRDD<String, Double> bulidSessionItermRDD(JavaRDD<ClickStream> clickstreamRDD) {
+    JavaPairRDD<String, String> sessionItemRDD = clickstreamRDD.mapToPair(new PairFunction<ClickStream, String, String>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, String> call(ClickStream click) throws Exception {
+
+        String sessionID = click.getSessionID();
+        return new Tuple2<>(sessionID, click.getViewDataset());
+      }
+    }).distinct();
+
+    // remove some sessions
+    JavaPairRDD<String, Double> sessionItemNumRDD = sessionItemRDD.keys().mapToPair(new PairFunction<String, String, Double>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, Double> call(String item) throws Exception {
+        return new Tuple2<>(item, 1.0);
+      }
+    }).reduceByKey(new Function2<Double, Double, Double>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Double call(Double v1, Double v2) throws Exception {
+        return v1 + v2;
+      }
+    }).filter(new Function<Tuple2<String, Double>, Boolean>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Boolean call(Tuple2<String, Double> arg0) throws Exception {
+        Boolean b = true;
+        if (arg0._2 < 2) {
+          b = false;
+        }
+        return b;
+      }
+    });
+
+    return sessionItemNumRDD.leftOuterJoin(sessionItemRDD).mapToPair(new PairFunction<Tuple2<String, Tuple2<Double, Optional<String>>>, String, Double>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, Double> call(Tuple2<String, Tuple2<Double, Optional<String>>> arg0) throws Exception {
+
+        Tuple2<Double, Optional<String>> test = arg0._2;
+        Optional<String> optStr = test._2;
+        String item = "";
+        if (optStr.isPresent()) {
+          item = optStr.get();
+        }
+        return new Tuple2<>(arg0._1 + "," + item, 1.0);
+      }
+
+    });
+  }
+
+  public JavaPairRDD<String, List<String>> bulidSessionDatasetRDD(Properties props, ESDriver es, SparkDriver spark) {
+
+    List<String> result = new ArrayList<>();
+    List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+    for (int n = 0; n < logIndexList.size(); n++) {
+      String logIndex = logIndexList.get(n);
+      SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(props.getProperty(MudrodConstants.SESSION_STATS_PREFIX)).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery())
+              .setSize(100).execute().actionGet();
+      while (true) {
+        for (SearchHit hit : scrollResp.getHits().getHits()) {
+          Map<String, Object> session = hit.getSource();
+          String sessionID = (String) session.get("SessionID");
+          String views = (String) session.get("views");
+          if (views != null && !"".equals(views)) {
+            String sessionItems = sessionID + ":" + views;
+            result.add(sessionItems);
+          }
+        }
+
+        scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+        if (scrollResp.getHits().getHits().length == 0) {
+          break;
+        }
+      }
+    }
+
+    JavaRDD<String> sessionRDD = spark.sc.parallelize(result);
+
+    return sessionRDD.mapToPair(new PairFunction<String, String, List<String>>() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, List<String>> call(String sessionitem) throws Exception {
+        String[] splits = sessionitem.split(":");
+        String sessionId = splits[0];
+        List<String> itemList = new ArrayList<>();
+
+        String items = splits[1];
+        String[] itemArr = items.split(",");
+        int size = itemArr.length;
+        for (int i = 0; i < size; i++) {
+          String item = itemArr[i];
+          if (!itemList.contains(item))
+            itemList.add(itemArr[i]);
+        }
+
+        return new Tuple2<>(sessionId, itemList);
+      }
+    });
+  }
+
+  /**
+   * extractClickStreamFromES:Extract click streams from logs stored in
+   * Elasticsearch
+   *
+   * @param props
+   *          the Mudrod configuration
+   * @param es
+   *          the Elasticsearch drive
+   * @param spark
+   *          the spark driver
+   * @return clickstream list in JavaRDD format {@link ClickStream}
+   */
+  public JavaRDD<RankingTrainData> extractRankingTrainData(Properties props, ESDriver es, SparkDriver spark) {
+
+    List<RankingTrainData> queryList = this.extractRankingTrainData(props, es);
+    return spark.sc.parallelize(queryList);
+
+  }
+
+  /**
+   * getClickStreamList:Extract click streams from logs stored in Elasticsearch.
+   *
+   * @param props
+   *          the Mudrod configuration
+   * @param es
+   *          the Elasticsearch driver
+   * @return clickstream list {@link ClickStream}
+   */
+  protected List<RankingTrainData> extractRankingTrainData(Properties props, ESDriver es) {
+    List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+    LOG.info(logIndexList.toString());
+
+    List<RankingTrainData> result = new ArrayList<>();
+    for (int n = 0; n < logIndexList.size(); n++) {
+      String logIndex = logIndexList.get(n);
+      List<String> sessionIdList;
+      try {
+        sessionIdList = this.getSessions(props, es, logIndex);
+        Session session = new Session(props, es);
+        int sessionNum = sessionIdList.size();
+        for (int i = 0; i < sessionNum; i++) {
+          String[] sArr = sessionIdList.get(i).split(",");
+          List<RankingTrainData> datas = session.getRankingTrainData(sArr[1], sArr[2], sArr[0]);
+          result.addAll(datas);
+        }
+      } catch (Exception e) {
+        LOG.error("Error which extracting ranking train data: {}", e);
+      }
+    }
+
+    return result;
+  }
+
+  protected JavaRDD<RankingTrainData> extractRankingTrainDataInParallel(Properties props, SparkDriver spark, ESDriver es) {
+
+    List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+    LOG.info(logIndexList.toString());
+
+    List<String> sessionIdList = new ArrayList<>();
+    for (int n = 0; n < logIndexList.size(); n++) {
+      String logIndex = logIndexList.get(n);
+      List<String> tmpsessionList = this.getSessions(props, es, logIndex);
+      sessionIdList.addAll(tmpsessionList);
+    }
+
+    JavaRDD<String> sessionRDD = spark.sc.parallelize(sessionIdList, 16);
+
+    JavaRDD<RankingTrainData> clickStreamRDD = sessionRDD.mapPartitions(new FlatMapFunction<Iterator<String>, RankingTrainData>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Iterator<RankingTrainData> call(Iterator<String> arg0) throws Exception {
+        ESDriver tmpES = new ESDriver(props);
+        tmpES.createBulkProcessor();
+
+        Session session = new Session(props, tmpES);
+        List<RankingTrainData> clickstreams = new ArrayList<>();
+        while (arg0.hasNext()) {
+          String s = arg0.next();
+          String[] sArr = s.split(",");
+          List<RankingTrainData> clicks = session.getRankingTrainData(sArr[1], sArr[2], sArr[0]);
+          clickstreams.addAll(clicks);
+        }
+        tmpES.destroyBulkProcessor();
+        tmpES.close();
+        return clickstreams.iterator();
+      }
+    });
+
+    LOG.info("Clickstream number: {}", clickStreamRDD.count());
+
+    return clickStreamRDD;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java
new file mode 100644
index 0000000..958e184
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionNode.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * ClassName: SessionNode Function: Functions related to a node in a session
+ * tree sturcture.
+ */
+public class SessionNode {
+  // id: Node ID
+  protected String id;
+  // value: Node value
+  protected String value;
+  // parent: Parent node of this node
+  protected SessionNode parent;
+  // children: Child nodes of this node
+  protected List<SessionNode> children = new ArrayList<>();
+  // time: request time of node
+  protected String time;
+  // request: request url of this node
+  protected String request;
+  // referer: previous request url of this node
+  protected String referer;
+  // seq: sequence of this node
+  protected int seq;
+  // key: type of this node extracted from url, including three types -
+  // dataset,datasetlist,ftp
+  protected String key;
+  // logType: log types of this node, including two types - po.dacc, ftp
+  protected String logType;
+  // search: query extracted from this node
+  protected String search;
+  // filter: filter facets extracted from this node
+  protected Map<String, String> filter;
+  // datasetId: viewed/downloaded data set ID
+  protected String datasetId;
+
+  public SessionNode() {
+
+  }
+
+  /**
+   * Creates a new instance of SessionNode.
+   *
+   * @param request: request url
+   * @param logType: including two types - po.dacc, ftp
+   * @param referer: previous request url
+   * @param time:    request time of node
+   * @param seq:     sequence of this node
+   */
+  public SessionNode(String request, String logType, String referer, String time, int seq) {
+    this.logType = logType;
+    this.time = time;
+    this.seq = seq;
+    this.setRequest(request);
+    this.setReferer(referer);
+    this.setKey(request, logType);
+  }
+
+  /**
+   * setReferer: Set previous request url of this node
+   *
+   * @param referer previous request url
+   */
+  public void setReferer(String referer) {
+    if (referer == null) {
+      this.referer = "";
+      return;
+    }
+    this.referer = referer.toLowerCase().replace("http://podaac.jpl.nasa.gov", "");
+  }
+
+  /**
+   * setRequest: Set request url of this node
+   *
+   * @param req request url
+   */
+  public void setRequest(String req) {
+    this.request = req;
+    if (this.logType.equals("PO.DAAC")) {
+      this.parseRequest(req);
+    }
+  }
+
+  /**
+   * getChildren:Get child nodes of this node
+   *
+   * @return child nodes
+   */
+  public List<SessionNode> getChildren() {
+    return this.children;
+  }
+
+  /**
+   * setChildren: Set child nodes of this node
+   *
+   * @param children child nodes of this node
+   */
+  public void setChildren(List<SessionNode> children) {
+    this.children = children;
+  }
+
+  /**
+   * addChildren: Add a children node
+   *
+   * @param node session node
+   */
+  public void addChildren(SessionNode node) {
+    this.children.add(node);
+  }
+
+  /**
+   * getId:Get node ID
+   *
+   * @return node ID of this node
+   */
+  public String getId() {
+    return this.id;
+  }
+
+  /**
+   * bSame:Compare this node with another node
+   *
+   * @param node {@link SessionNode}
+   * @return boolean value, true mean the two nodes are same
+   */
+  public Boolean bSame(SessionNode node) {
+    Boolean bsame = false;
+    if (this.request.equals(node.request)) {
+      bsame = true;
+    }
+    return bsame;
+  }
+
+  /**
+   * setKey:Set request type which contains three categories -
+   * dataset,datasetlist,ftp
+   *
+   * @param request request url
+   * @param logType url type
+   */
+  public void setKey(String request, String logType) {
+    this.key = "";
+    String datasetlist = "/datasetlist?";
+    String dataset = "/dataset/";
+    if (logType.equals("ftp")) {
+      this.key = "ftp";
+    } else if (logType.equals("root")) {
+      this.key = "root";
+    } else {
+      if (request.contains(datasetlist)) {
+        this.key = "datasetlist";
+      } else if (request.contains(dataset) /* || request.contains(granule) */) {
+        this.key = "dataset";
+      }
+    }
+  }
+
+  /**
+   * getKey:Get request type which contains three categories -
+   * dataset,datasetlist,ftp
+   *
+   * @return request url type of this node
+   */
+  public String getKey() {
+    return this.key;
+  }
+
+  /**
+   * getRequest:Get node request
+   *
+   * @return request url of this node
+   */
+  public String getRequest() {
+    return this.request;
+  }
+
+  /**
+   * getReferer:Get previous request url of this node
+   *
+   * @return previous request url of this node
+   */
+  public String getReferer() {
+    return this.referer;
+  }
+
+  /**
+   * getParent:Get parent node of this node
+   *
+   * @return parent node of this node
+   */
+  public SessionNode getParent() {
+    return this.parent;
+  }
+
+  /**
+   * setParent: Set parent node of this node
+   *
+   * @param parent the previous request node of this node
+   */
+  public void setParent(SessionNode parent) {
+    this.parent = parent;
+  }
+
+  /**
+   * getSearch:Get query of this node
+   *
+   * @return search query of this node
+   */
+  public String getSearch() {
+    return this.search;
+  }
+
+  /**
+   * getFilter:Get filter facets of this node
+   *
+   * @return filter values of this node
+   */
+  public Map<String, String> getFilter() {
+    return this.filter;
+  }
+
+  /**
+   * getDatasetId:Get data set ID of this node
+   *
+   * @return viewing/downloading data set of this node
+   */
+  public String getDatasetId() {
+    return this.datasetId;
+  }
+
+  /**
+   * getSeq:Get sequence of this node
+   *
+   * @return request sequence of this node
+   */
+  public int getSeq() {
+    return this.seq;
+  }
+
+  /**
+   * getFilterStr:Get filter facets of this node
+   *
+   * @return filters values of this node
+   */
+  public String getFilterStr() {
+    String filter = "";
+    if (this.filter.size() > 0) {
+      Iterator iter = this.filter.keySet().iterator();
+      while (iter.hasNext()) {
+        String key = (String) iter.next();
+        String val = this.filter.get(key);
+        filter += key + "=" + val + ",";
+      }
+
+      filter = filter.substring(0, filter.length() - 1);
+    }
+
+    return filter;
+  }
+
+  /**
+   * parseRequest:Parse request to extract request type
+   *
+   * @param request request url of this node
+   */
+  public void parseRequest(String request) {
+    Pattern pattern = Pattern.compile("get (.*?) http/*");
+    Matcher matcher = pattern.matcher(request.trim().toLowerCase());
+    while (matcher.find()) {
+      request = matcher.group(1);
+    }
+    if (request.contains("/dataset/")) {
+      this.parseDatasetId(request);
+    }
+
+    this.request = request.toLowerCase();
+  }
+
+  /**
+   * parseFilterParams:Parse filter facets information
+   *
+   * @param params filter key value pairs of this node
+   */
+  private void parseFilterParams(Map<String, String> params) {
+    this.filter = new HashMap<String, String>();
+    if (params.containsKey("ids")) {
+      String idsStr = params.get("ids");
+      if (!idsStr.equals("")) {
+        idsStr = URLDecoder.decode(idsStr);
+        String[] ids = idsStr.split(":");
+        String valueStr = params.get("values");
+        if (valueStr != null) {
+          valueStr = URLDecoder.decode(valueStr);
+          String[] values = valueStr.split(":");
+          int size = ids.length;
+          for (int i = 0; i < size; i++) {
+            this.filter.put(ids[i], values[i]);
+          }
+        }
+      }
+    }
+
+    if (!this.search.equals("")) {
+      this.filter.put("search", this.search);
+    }
+  }
+
+  /**
+   * parseDatasetId:Parse Request to extract data set ID
+   *
+   * @param request request url
+   */
+  public void parseDatasetId(String request) {
+    try {
+      request = URLDecoder.decode(request, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      e.printStackTrace();
+    }
+    String[] twoparts = request.split("[?]");
+    String[] parts = twoparts[0].split("/");
+    if (parts.length <= 2) {
+      return;
+    }
+    this.datasetId = parts[2];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java
new file mode 100644
index 0000000..46c8d0c
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/SessionTree.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * ClassName: SessionTree Function: Convert request list in a session to a tree
+ */
+public class SessionTree extends MudrodAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(SessionTree.class);
+  // size: node numbers in the session tree
+  public int size = 0;
+  // root: root node of session tree
+  protected SessionNode root = null;
+  // binsert: indicates inserting a node or not
+  public boolean binsert = false;
+  // tmpnode: tempt node
+  public SessionNode tmpnode;
+  // latestDatasetnode: the latest inserted node whose key is "dataset"
+  public SessionNode latestDatasetnode;
+  // sessionID: session ID
+  private String sessionID;
+  // cleanupType: session type in Elasticsearch
+  private String cleanupType;
+
+  /**
+   * Creates a new instance of SessionTree.
+   *
+   * @param props:       the Mudrod configuration
+   * @param es:          the Elasticsearch drive
+   * @param rootData:    root node of the tree
+   * @param sessionID:   session ID
+   * @param cleanupType: session type
+   */
+  public SessionTree(Properties props, ESDriver es, SessionNode rootData, String sessionID, String cleanupType) {
+    super(props, es, null);
+    root = new SessionNode("root", "root", "", "", 0);
+    tmpnode = root;
+    this.sessionID = sessionID;
+    this.cleanupType = cleanupType;
+  }
+
+  /**
+   * Creates a new instance of SessionTree.
+   *
+   * @param props:       the Mudrod configuration
+   * @param es:          the Elasticsearch drive
+   * @param sessionID:   session ID
+   * @param cleanupType: session type
+   */
+  public SessionTree(Properties props, ESDriver es, String sessionID, String cleanupType) {
+    super(props, es, null);
+    root = new SessionNode("root", "root", "", "", 0);
+    root.setParent(root);
+    tmpnode = root;
+    this.sessionID = sessionID;
+    this.cleanupType = cleanupType;
+  }
+
+  /**
+   * insert: insert a node into the session tree.
+   *
+   * @param node {@link SessionNode}
+   * @return session node
+   */
+  public SessionNode insert(SessionNode node) {
+    // begin with datasetlist
+    if (node.getKey().equals("datasetlist")) {
+      this.binsert = true;
+    }
+    if (!this.binsert) {
+      return null;
+    }
+    // remove unrelated node
+    if (!node.getKey().equals("datasetlist") && !node.getKey().equals("dataset") && !node.getKey().equals("ftp")) {
+      return null;
+    }
+    // remove dumplicated click
+    if (node.getRequest().equals(tmpnode.getRequest())) {
+      return null;
+    }
+    // search insert node
+    SessionNode parentnode = this.searchParentNode(node);
+    if (parentnode == null) {
+      return null;
+    }
+    node.setParent(parentnode);
+    parentnode.addChildren(node);
+
+    // record insert node
+    tmpnode = node;
+    if ("dataset".equals(node.getKey())) {
+      latestDatasetnode = node;
+    }
+
+    size++;
+    return node;
+  }
+
+  /**
+   * printTree: Print session tree
+   *
+   * @param node root node of the session tree
+   */
+  public void printTree(SessionNode node) {
+    LOG.info("node: {} \n", node.getRequest());
+    if (node.children.isEmpty()) {
+      for (int i = 0; i < node.children.size(); i++) {
+        printTree(node.children.get(i));
+      }
+    }
+  }
+
+  /**
+   * TreeToJson: Convert the session tree to Json object
+   *
+   * @param node node of the session tree
+   * @return tree content in Json format
+   */
+  public JsonObject treeToJson(SessionNode node) {
+    Gson gson = new Gson();
+    JsonObject json = new JsonObject();
+
+    json.addProperty("seq", node.getSeq());
+    if ("datasetlist".equals(node.getKey())) {
+      json.addProperty("icon", "./resources/images/searching.png");
+      json.addProperty("name", node.getRequest());
+    } else if ("dataset".equals(node.getKey())) {
+      json.addProperty("icon", "./resources/images/viewing.png");
+      json.addProperty("name", node.getDatasetId());
+    } else if ("ftp".equals(node.getKey())) {
+      json.addProperty("icon", "./resources/images/downloading.png");
+      json.addProperty("name", node.getRequest());
+    } else if ("root".equals(node.getKey())) {
+      json.addProperty("name", "");
+      json.addProperty("icon", "./resources/images/users.png");
+    }
+
+    if (!node.children.isEmpty()) {
+      List<JsonObject> jsonChildren = new ArrayList<>();
+      for (int i = 0; i < node.children.size(); i++) {
+        JsonObject jsonChild = treeToJson(node.children.get(i));
+        jsonChildren.add(jsonChild);
+      }
+      JsonElement jsonElement = gson.toJsonTree(jsonChildren);
+      json.add("children", jsonElement);
+    }
+
+    return json;
+  }
+
+  /**
+   * getClickStreamList: Get click stream list in the session
+   *
+   * @return {@link ClickStream}
+   */
+  public List<ClickStream> getClickStreamList() {
+
+    List<ClickStream> clickthroughs = new ArrayList<>();
+    List<SessionNode> viewnodes = this.getViewNodes(this.root);
+    for (int i = 0; i < viewnodes.size(); i++) {
+
+      SessionNode viewnode = viewnodes.get(i);
+      SessionNode parent = viewnode.getParent();
+      List<SessionNode> children = viewnode.getChildren();
+
+      if (!"datasetlist".equals(parent.getKey())) {
+        continue;
+      }
+
+      RequestUrl requestURL = new RequestUrl();
+      String viewquery = "";
+      try {
+        String infoStr = requestURL.getSearchInfo(viewnode.getRequest());
+        viewquery = es.customAnalyzing(props.getProperty("indexName"), infoStr);
+      } catch (UnsupportedEncodingException | InterruptedException | ExecutionException e) {
+        LOG.warn("Exception getting search info. Ignoring...", e);
+      }
+
+      String dataset = viewnode.getDatasetId();
+      boolean download = false;
+      for (int j = 0; j < children.size(); j++) {
+        SessionNode child = children.get(j);
+        if ("ftp".equals(child.getKey())) {
+          download = true;
+          break;
+        }
+      }
+
+      if (viewquery != null && !"".equals(viewquery)) {
+        String[] queries = viewquery.trim().split(",");
+        if (queries.length > 0) {
+          for (int k = 0; k < queries.length; k++) {
+            ClickStream data = new ClickStream(queries[k], dataset, download);
+            data.setSessionId(this.sessionID);
+            data.setType(this.cleanupType);
+            clickthroughs.add(data);
+          }
+        }
+      }
+    }
+
+    return clickthroughs;
+  }
+
+  /**
+   * searchParentNode:Get parent node of a session node
+   *
+   * @param node {@link SessionNode}
+   * @return node {@link SessionNode}
+   */
+  private SessionNode searchParentNode(SessionNode node) {
+
+    String nodeKey = node.getKey();
+
+    if ("datasetlist".equals(nodeKey)) {
+      if ("-".equals(node.getReferer())) {
+        return root;
+      } else {
+        SessionNode tmp = this.findLatestRefer(tmpnode, node.getReferer());
+        if (tmp == null) {
+          return root;
+        } else {
+          return tmp;
+        }
+      }
+    } else if ("dataset".equals(nodeKey)) {
+      if ("-".equals(node.getReferer())) {
+        return null;
+      } else {
+        return this.findLatestRefer(tmpnode, node.getReferer());
+      }
+    } else if ("ftp".equals(nodeKey)) {
+      return latestDatasetnode;
+    }
+
+    return tmpnode;
+  }
+
+  /**
+   * findLatestRefer: Find parent node whose visiting url is equal to the refer
+   * url of a session node
+   *
+   * @param node:  {@link SessionNode}
+   * @param refer: request url
+   * @return
+   */
+  private SessionNode findLatestRefer(SessionNode node, String refer) {
+    while (true) {
+      if ("root".equals(node.getKey())) {
+        return null;
+      }
+      SessionNode parentNode = node.getParent();
+      if (refer.equals(parentNode.getRequest())) {
+        return parentNode;
+      }
+
+      SessionNode tmp = this.iterChild(parentNode, refer);
+      if (tmp == null) {
+        node = parentNode;
+        continue;
+      } else {
+        return tmp;
+      }
+    }
+  }
+
+  /**
+   * iterChild:
+   *
+   * @param start
+   * @param refer
+   * @return
+   */
+  private SessionNode iterChild(SessionNode start, String refer) {
+    List<SessionNode> children = start.getChildren();
+    for (int i = children.size() - 1; i >= 0; i--) {
+      SessionNode tmp = children.get(i);
+      if (tmp.getChildren().isEmpty()) {
+        if (refer.equals(tmp.getRequest())) {
+          return tmp;
+        } else {
+          continue;
+        }
+      } else {
+        iterChild(tmp, refer);
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * check:
+   *
+   * @param children
+   * @param str
+   * @return
+   */
+  private boolean check(List<SessionNode> children, String str) {
+    for (int i = 0; i < children.size(); i++) {
+      if (children.get(i).key.equals(str)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * insertHelperChildren:
+   *
+   * @param entry
+   * @param children
+   * @return
+   */
+  private boolean insertHelperChildren(SessionNode entry, List<SessionNode> children) {
+    for (int i = 0; i < children.size(); i++) {
+      boolean result = insertHelper(entry, children.get(i));
+      if (result) {
+        return result;
+      }
+    }
+    return false;
+
+  }
+
+  /**
+   * insertHelper:
+   *
+   * @param entry
+   * @param node
+   * @return
+   */
+  private boolean insertHelper(SessionNode entry, SessionNode node) {
+    if ("datasetlist".equals(entry.key) || "dataset".equals(entry.key)) {
+      if ("datasetlist".equals(node.key)) {
+        if (node.children.isEmpty()) {
+          node.children.add(entry);
+          return true;
+        } else {
+          boolean flag = check(node.children, "datasetlist");
+          if (!flag) {
+            node.children.add(entry);
+            return true;
+          } else {
+            insertHelperChildren(entry, node.children);
+          }
+        }
+      } else {
+        insertHelperChildren(entry, node.children);
+      }
+    } else if ("ftp".equals(entry.key)) {
+      if ("dataset".equals(node.key)) {
+        if (node.children.isEmpty()) {
+          node.children.add(entry);
+          return true;
+        } else {
+          boolean flag = check(node.children, "dataset");
+          if (!flag) {
+            node.children.add(entry);
+            return true;
+          } else {
+            insertHelperChildren(entry, node.children);
+          }
+        }
+      } else {
+        insertHelperChildren(entry, node.children);
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * getViewNodes: Get a session node's child nodes whose key is "dataset".
+   *
+   * @param node
+   * @return a list of session node
+   */
+  private List<SessionNode> getViewNodes(SessionNode node) {
+
+    List<SessionNode> viewnodes = new ArrayList<>();
+    if ("dataset".equals(node.getKey())) {
+      viewnodes.add(node);
+    }
+
+    if (!node.children.isEmpty()) {
+      for (int i = 0; i < node.children.size(); i++) {
+        SessionNode childNode = node.children.get(i);
+        viewnodes.addAll(getViewNodes(childNode));
+      }
+    }
+
+    return viewnodes;
+  }
+
+  private List<SessionNode> getQueryNodes(SessionNode node) {
+    return this.getNodes(node, "datasetlist");
+  }
+
+  private List<SessionNode> getNodes(SessionNode node, String nodeKey) {
+
+    List<SessionNode> nodes = new ArrayList<>();
+    if (node.getKey().equals(nodeKey)) {
+      nodes.add(node);
+    }
+
+    if (!node.children.isEmpty()) {
+      for (int i = 0; i < node.children.size(); i++) {
+        SessionNode childNode = node.children.get(i);
+        nodes.addAll(getNodes(childNode, nodeKey));
+      }
+    }
+
+    return nodes;
+  }
+
+  /**
+   * Obtain the ranking training data.
+   *
+   * @param indexName   the index from whcih to obtain the data
+   * @param sessionID   a valid session identifier
+   * @return {@link ClickStream}
+   * @throws UnsupportedEncodingException if there is an error whilst
+   *                                      processing the ranking training data.
+   */
+  public List<RankingTrainData> getRankingTrainData(String indexName, String sessionID) throws UnsupportedEncodingException {
+
+    List<RankingTrainData> trainDatas = new ArrayList<>();
+
+    List<SessionNode> queryNodes = this.getQueryNodes(this.root);
+    for (int i = 0; i < queryNodes.size(); i++) {
+      SessionNode querynode = queryNodes.get(i);
+      List<SessionNode> children = querynode.getChildren();
+
+      LinkedHashMap<String, Boolean> datasetOpt = new LinkedHashMap<>();
+      int ndownload = 0;
+      for (int j = 0; j < children.size(); j++) {
+        SessionNode node = children.get(j);
+        if ("dataset".equals(node.getKey())) {
+          Boolean bDownload = false;
+          List<SessionNode> nodeChildren = node.getChildren();
+          int childSize = nodeChildren.size();
+          for (int k = 0; k < childSize; k++) {
+            if ("ftp".equals(nodeChildren.get(k).getKey())) {
+              bDownload = true;
+              ndownload += 1;
+              break;
+            }
+          }
+          datasetOpt.put(node.datasetId, bDownload);
+        }
+      }
+
+      // method 1: The priority of download data are higher
+      if (datasetOpt.size() > 1 && ndownload > 0) {
+        // query
+        RequestUrl requestURL = new RequestUrl();
+        String queryUrl = querynode.getRequest();
+        String infoStr = requestURL.getSearchInfo(queryUrl);
+        String query = null;
+        try {
+          query = es.customAnalyzing(props.getProperty("indexName"), infoStr);
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException("Error performing custom analyzing", e);
+        }
+        Map<String, String> filter = RequestUrl.getFilterInfo(queryUrl);
+
+        for (String datasetA : datasetOpt.keySet()) {
+          Boolean bDownloadA = datasetOpt.get(datasetA);
+          if (bDownloadA) {
+            for (String datasetB : datasetOpt.keySet()) {
+              Boolean bDownloadB = datasetOpt.get(datasetB);
+              if (!bDownloadB) {
+
+                String[] queries = query.split(",");
+                for (int l = 0; l < queries.length; l++) {
+                  RankingTrainData trainData = new RankingTrainData(queries[l], datasetA, datasetB);
+
+                  trainData.setSessionId(this.sessionID);
+                  trainData.setIndex(indexName);
+                  trainData.setFilter(filter);
+                  trainDatas.add(trainData);
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+
+    return trainDatas;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java
new file mode 100644
index 0000000..e3392e4
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/WebLog.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
+
+import java.io.Serializable;
+
+/**
+ * This class represents an Apache access log line. See
+ * http://httpd.apache.org/docs/2.2/logs.html for more details.
+ */
+public class WebLog implements Serializable {
+  String LogType;
+  String IP;
+  String Time;
+  String Request;
+  double Bytes;
+
+  public String getLogType() {
+    return this.LogType;
+  }
+
+  public String getIP() {
+    return this.IP;
+  }
+
+  public String getTime() {
+    return this.Time;
+  }
+
+  public String getRequest() {
+    return this.Request;
+  }
+
+  public double getBytes() {
+    return this.Bytes;
+  }
+
+  public WebLog() {
+
+  }
+
+  public static String SwithtoNum(String time) {
+    if (time.contains("Jan")) {
+      time = time.replace("Jan", "1");
+    } else if (time.contains("Feb")) {
+      time = time.replace("Feb", "2");
+    } else if (time.contains("Mar")) {
+      time = time.replace("Mar", "3");
+    } else if (time.contains("Apr")) {
+      time = time.replace("Apr", "4");
+    } else if (time.contains("May")) {
+      time = time.replace("May", "5");
+    } else if (time.contains("Jun")) {
+      time = time.replace("Jun", "6");
+    } else if (time.contains("Jul")) {
+      time = time.replace("Jul", "7");
+    } else if (time.contains("Aug")) {
+      time = time.replace("Aug", "8");
+    } else if (time.contains("Sep")) {
+      time = time.replace("Sep", "9");
+    } else if (time.contains("Oct")) {
+      time = time.replace("Oct", "10");
+    } else if (time.contains("Nov")) {
+      time = time.replace("Nov", "11");
+    } else if (time.contains("Dec")) {
+      time = time.replace("Dec", "12");
+    }
+    return time;
+  }
+
+  public static boolean checknull(String s) {
+    if (s.equals("{}")) {
+      return false;
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java
new file mode 100644
index 0000000..7aa9898
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/structure/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes data structure needed for web log analysis
+ */
+package gov.nasa.jpl.mudrod.weblog.structure;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/config.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/config.xml b/core/src/main/resources/config.xml
new file mode 100644
index 0000000..62ac7a6
--- /dev/null
+++ b/core/src/main/resources/config.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed under the Apache License, Version 2.0 (the "License"); 
+  you may not use this file except in compliance with the License. 
+  You may obtain  a copy of the License at 
+  
+  http://www.apache.org/licenses/LICENSE-2.0 Unless 
+  
+  required by applicable law or agreed to in writing, software 
+  distributed under the License is distributed on an "AS IS" 
+  BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
+  express or implied. See the License for the specific language 
+  governing permissions and limitations under the License. 
+-->
+<Config>
+    <para name="Cleanup_type_prefix">cleanupLog</para>
+
+    <para name="clickStreamLinkageType">ClickStreamLinkage</para>
+
+    <para name="clickStreamMatrixType">clickstreamMatrix</para>
+
+    <para name="clickstreamSVDDimension">50</para>
+
+    <para name="clickStream_w">2</para>
+
+    <para name="commentType">comment</para>
+
+    <para name="downloadf">100</para>
+
+    <para name="downloadWeight">3</para>
+
+    <para name="clusterName">MudrodES</para>
+
+    <para name="ES_Transport_TCP_Port">9300</para>
+
+    <para name="ES_unicast_hosts">127.0.0.1</para>
+
+    <para name="ES_HTTP_port">9200</para>
+
+    <para name="indexName">mudrod</para>
+
+    <para name="ftpPrefix">FTP.</para>
+    
+    <para name="FTP_type_prefix">rawftp</para>
+
+    <para name="HTTP_type_prefix">rawhttp</para>
+
+    <para name="httpPrefix">WWW.</para>
+
+    <para name="logIndexName">podaaclog</para>
+
+    <para name="metadataLinkageType">MetadataLinkage</para>
+
+    <para name="metadataSVDDimension">50</para>
+
+    <para name="metadataurl">null</para>
+
+    <para name="metadata_w">1</para>
+
+    <para name="mini_userHistory">5</para>
+
+    <!--
+    The ontology service implementation. Possible values include
+     EsipPortal - EsipPortalOntology
+     EsipCOR - EsipCOROntology
+     Local - gov.nasa.jpl.mudrod.ontology.process.Local
+     -->
+    <para name="mudrod.ontology.implementation">Local</para>
+
+    <para name="ontologyLinkageType">SWEETLinkage</para>
+
+    <para name="ontology_w">2</para>
+
+    <!--
+     Log processing type. Possible values include
+     'sequential' or 'parallel'.
+     -->
+    <para name="processingType">parallel</para>
+
+    <para name="raw_metadataType">RawMetadata</para>
+
+    <para name="searchf">100</para>
+
+    <para name="sendingrate">30</para>
+
+    <para name="SessionPort">8080</para>
+
+    <para name="SessionStats_prefix">sessionstats</para>
+
+    <para name="SessionUrl">/mudrod-service/session.html</para>
+
+    <!-- The name of your application. This will appear in the UI and in log data.-->
+    <para name="spark.app.name">MudrodSparkApp</para>
+
+    <!--
+    The default Spark cluster manager to connect to. See the list of allowed master URL's.
+    For more information, consult http://spark.apache.org/docs/latest/submitting-applications.html#master-urls
+    -->
+    <para name="spark.master">local[4]</para>
+
+    <!-- ${svmSgdModel.value} is resolved at build time. See the property in core/pom.xml for the value -->
+    <para name="svmSgdModel">${svmSgdModel.value}.zip</para>
+    
+    <para name="timegap">600</para>
+    
+    <para name="userHistoryLinkageType">UserHistoryLinkage</para>
+    
+    <para name="userHistory_w">2</para>
+
+    <para name="viewf">200</para>
+
+
+
+    <!-- FOLLOWING NEEDS TO BE ADDED TO MudrodConstants.java -->
+    <para name="recom_metadataType">RecomMetadata</para>
+    <!-- recommendation -->
+    <para name="metadataTermTFIDFSimType">MetadataTermTFIDFSim</para>
+    <para name="metadataWordTFIDFSimType">MetadataWordTFIDFSim</para>
+    <para name="metadataCodeSimType">MetadataCodeSim</para>
+    <para name="metadataSessionBasedSimType">MetadataSBSim</para>
+    <para name="metadataTopicSimType">MetadataTBSim</para>
+    <!--
+    Log processing parallel optimization type. Possible values include
+    default - MudrodConstants.PARALLEL_OPTIMIZATION_DEFAULT
+    repartition - MudrodConstants.PARALLEL_OPTIMIZATION_REPARTITION
+    -->
+    <para name="parallelOptimization">repartition</para>
+
+</Config>

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/elastic_mappings.json
----------------------------------------------------------------------
diff --git a/core/src/main/resources/elastic_mappings.json b/core/src/main/resources/elastic_mappings.json
new file mode 100644
index 0000000..685f49e
--- /dev/null
+++ b/core/src/main/resources/elastic_mappings.json
@@ -0,0 +1,68 @@
+{
+  "_default_": {
+    "properties": {
+      "keywords": {
+        "type": "text",
+        "analyzer": "csv",
+        "fielddata": true
+      },
+      "views": {
+        "type": "string",
+        "analyzer": "csv"
+      },
+      "downloads": {
+        "type": "string",
+        "analyzer": "csv"
+      },
+      "RequestUrl": {
+        "type": "string",
+        "include_in_all": false,
+        "index": "no"
+      },
+      "IP": {
+        "type": "keyword",
+        "index": "not_analyzed"
+      },
+      "Browser": {
+        "type": "string",
+        "include_in_all": false,
+        "index": "no"
+      },
+      "SessionURL": {
+        "type": "string",
+        "include_in_all": false,
+        "index": "no"
+      },
+      "Referer": {
+        "type": "string",
+        "index": "not_analyzed"
+      },
+      "SessionID": {
+        "type": "string",
+        "index": "not_analyzed"
+      },
+      "Response": {
+        "type": "string",
+        "include_in_all": false,
+        "index": "no"
+      },
+      "Request": {
+        "type": "string",
+        "include_in_all": false,
+        "index": "no"
+      },
+      "Coordinates": {
+        "type": "geo_point",
+        "include_in_all": false,
+        "index": "no"
+      },
+      "LogType": {
+        "type": "string",
+        "index": "not_analyzed"
+      },
+      "Dataset-Metadata": {
+        "type": "completion"
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/elastic_settings.json
----------------------------------------------------------------------
diff --git a/core/src/main/resources/elastic_settings.json b/core/src/main/resources/elastic_settings.json
new file mode 100644
index 0000000..f5faa3e
--- /dev/null
+++ b/core/src/main/resources/elastic_settings.json
@@ -0,0 +1,36 @@
+{
+  "index": {
+    "number_of_replicas": 0,
+    "refresh_interval": "-1",
+    "number_of_shards": "5",
+    "translog.flush_threshold_size": "1g",
+    "translog.sync_interval": "30s",
+    "warmer.enabled": "false"
+  },
+  "analysis": {
+    "filter": {
+      "cody_stop": {
+        "type": "stop",
+        "stopwords": "_english_"
+      },
+      "cody_stemmer": {
+        "type": "stemmer",
+        "language": "light_english"
+      }
+    },
+    "analyzer": {
+      "cody": {
+        "tokenizer": "standard",
+        "filter": [
+          "lowercase",
+          "cody_stop",
+          "cody_stemmer"
+        ]
+      },
+      "csv": {
+        "type": "pattern",
+        "pattern": ","
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/data/_SUCCESS
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/data/_SUCCESS b/core/src/main/resources/javaSVMWithSGDModel/data/_SUCCESS
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/data/_common_metadata
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/data/_common_metadata b/core/src/main/resources/javaSVMWithSGDModel/data/_common_metadata
new file mode 100644
index 0000000..cafbf1b
Binary files /dev/null and b/core/src/main/resources/javaSVMWithSGDModel/data/_common_metadata differ

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/data/_metadata
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/data/_metadata b/core/src/main/resources/javaSVMWithSGDModel/data/_metadata
new file mode 100644
index 0000000..6bfec98
Binary files /dev/null and b/core/src/main/resources/javaSVMWithSGDModel/data/_metadata differ

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/data/part-r-00000-e008ae03-6b61-4931-ba29-27304de5a584.gz.parquet
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/data/part-r-00000-e008ae03-6b61-4931-ba29-27304de5a584.gz.parquet b/core/src/main/resources/javaSVMWithSGDModel/data/part-r-00000-e008ae03-6b61-4931-ba29-27304de5a584.gz.parquet
new file mode 100644
index 0000000..5033301
Binary files /dev/null and b/core/src/main/resources/javaSVMWithSGDModel/data/part-r-00000-e008ae03-6b61-4931-ba29-27304de5a584.gz.parquet differ

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/metadata/_SUCCESS
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/metadata/_SUCCESS b/core/src/main/resources/javaSVMWithSGDModel/metadata/_SUCCESS
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/javaSVMWithSGDModel/metadata/part-00000
----------------------------------------------------------------------
diff --git a/core/src/main/resources/javaSVMWithSGDModel/metadata/part-00000 b/core/src/main/resources/javaSVMWithSGDModel/metadata/part-00000
new file mode 100644
index 0000000..c972cbe
--- /dev/null
+++ b/core/src/main/resources/javaSVMWithSGDModel/metadata/part-00000
@@ -0,0 +1 @@
+{"class":"org.apache.spark.mllib.classification.SVMModel","version":"1.0","numFeatures":6,"numClasses":2}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/log4j.properties b/core/src/main/resources/log4j.properties
new file mode 100644
index 0000000..1e6e84d
--- /dev/null
+++ b/core/src/main/resources/log4j.properties
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); 
+# you may not use this file except in compliance with the License. 
+# You may obtain  a copy of the License at 
+#  
+# http://www.apache.org/licenses/LICENSE-2.0 Unless 
+#  
+# required by applicable law or agreed to in writing, software 
+# distributed under the License is distributed on an "AS IS" 
+# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
+# express or implied. See the License for the specific language 
+# governing permissions and limitations under the License. 
+# Define some default values that can be overridden by system properties
+# Logging Threshold
+mudrod.root.logger=INFO,DRFA, stdout
+mudrod.log.dir=.
+mudrod.log.file=mudrod.log
+log4j.threshhold=ALL
+# RootLogger - DailyRollingFileAppender
+log4j.rootLogger=${mudrod.root.logger}
+#special logging requirements for some commandline tools
+log4j.logger.MudrodEngine=INFO,cmdstdout
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${mudrod.log.dir}/${mudrod.log.file}
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+# Debugging Pattern format: Date LogLevel LoggerName (FileName:MethodName:LineNo) LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# stdout
+# Add *stdout* to rootlogger above if you want to use this 
+#
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# plain layout used for commandline tools to output to console
+#
+log4j.appender.cmdstdout=org.apache.log4j.ConsoleAppender
+log4j.appender.cmdstdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.cmdstdout.layout.ConversionPattern=%m%n
+#
+# Rolling File Appender
+#
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+# Custom Logging levels
+log4j.logger.akka=WARN
+log4j.logger.org.apache=WARN
+log4j.logger.gov.nasa.jpl.mudrod=INFO

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/log4j2.properties b/core/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..1e6e84d
--- /dev/null
+++ b/core/src/main/resources/log4j2.properties
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); 
+# you may not use this file except in compliance with the License. 
+# You may obtain  a copy of the License at 
+#  
+# http://www.apache.org/licenses/LICENSE-2.0 Unless 
+#  
+# required by applicable law or agreed to in writing, software 
+# distributed under the License is distributed on an "AS IS" 
+# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
+# express or implied. See the License for the specific language 
+# governing permissions and limitations under the License. 
+# Define some default values that can be overridden by system properties
+# Logging Threshold
+mudrod.root.logger=INFO,DRFA, stdout
+mudrod.log.dir=.
+mudrod.log.file=mudrod.log
+log4j.threshhold=ALL
+# RootLogger - DailyRollingFileAppender
+log4j.rootLogger=${mudrod.root.logger}
+#special logging requirements for some commandline tools
+log4j.logger.MudrodEngine=INFO,cmdstdout
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${mudrod.log.dir}/${mudrod.log.file}
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+# Debugging Pattern format: Date LogLevel LoggerName (FileName:MethodName:LineNo) LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# stdout
+# Add *stdout* to rootlogger above if you want to use this 
+#
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+#
+# plain layout used for commandline tools to output to console
+#
+log4j.appender.cmdstdout=org.apache.log4j.ConsoleAppender
+log4j.appender.cmdstdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.cmdstdout.layout.ConversionPattern=%m%n
+#
+# Rolling File Appender
+#
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+# Custom Logging levels
+log4j.logger.akka=WARN
+log4j.logger.org.apache=WARN
+log4j.logger.gov.nasa.jpl.mudrod=INFO