You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sdap.apache.org by GitBox <gi...@apache.org> on 2018/03/13 14:52:24 UTC

[GitHub] Yongyao closed pull request #7: SDAP-35 Overhaul MUDROD configuration

Yongyao closed pull request #7: SDAP-35 Overhaul MUDROD configuration
URL: https://github.com/apache/incubator-sdap-mudrod/pull/7
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 6b975b5..3009cbf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,3 +10,8 @@ service/bin/
 core/lib
 service/lib
 core/mudrod.log*
+core/.classpath
+core/.externalToolBuilders/Maven_Ant_Builder.launch
+core/maven-eclipse.xml
+service/.classpath
+web/.classpath
diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MudrodAbstract.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MudrodAbstract.java
index d62c627..75012a9 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MudrodAbstract.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MudrodAbstract.java
@@ -52,7 +52,7 @@ public MudrodAbstract(Properties props, ESDriver es, SparkDriver spark) {
     this.es = es;
     this.spark = spark;
 
-    if (this.props != null) {
+    if (this.props != null && this.es!=null) {
       this.initMudrod();
     }
   }
diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/OntologyDiscoveryEngine.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/OntologyDiscoveryEngine.java
index 48df7de..3b7bde5 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/OntologyDiscoveryEngine.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/OntologyDiscoveryEngine.java
@@ -41,28 +41,28 @@ public OntologyDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark)
    * Method of preprocessing ontology
    */
   public void preprocess() {
-    LOG.info("*****************Ontology preprocessing starts******************");
+    LOG.info("Ontology preprocessing starts");
     startTime = System.currentTimeMillis();
 
     DiscoveryStepAbstract at = new AggregateTriples(this.props, this.es, this.spark);
     at.execute();
 
     endTime = System.currentTimeMillis();
-    LOG.info("*****************Ontology preprocessing ends******************Took {}s", (endTime - startTime) / 1000);
+    LOG.info("Ontology preprocessing ends. Took {}s", (endTime - startTime) / 1000);
   }
 
   /**
    * Method of processing ontology
    */
   public void process() {
-    LOG.info("*****************Ontology processing starts******************");
+    LOG.info("Ontology processing starts.");
     startTime = System.currentTimeMillis();
 
     DiscoveryStepAbstract ol = new OntologyLinkCal(this.props, this.es, this.spark);
     ol.execute();
 
     endTime = System.currentTimeMillis();
-    LOG.info("*****************Ontology processing ends******************Took {}s", (endTime - startTime) / 1000);
+    LOG.info("Ontology processing ends. Took {}s", (endTime - startTime) / 1000);
   }
 
   public void output() {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/RecommendEngine.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/RecommendEngine.java
index d54a556..83ce59a 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/RecommendEngine.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/RecommendEngine.java
@@ -4,10 +4,10 @@
 import org.apache.sdap.mudrod.driver.SparkDriver;
 import org.apache.sdap.mudrod.recommendation.pre.ImportMetadata;
 import org.apache.sdap.mudrod.recommendation.pre.MetadataTFIDFGenerator;
-import org.apache.sdap.mudrod.recommendation.pre.NormalizeVariables;
+import org.apache.sdap.mudrod.recommendation.pre.NormalizeFeatures;
 import org.apache.sdap.mudrod.recommendation.pre.SessionCooccurence;
 import org.apache.sdap.mudrod.recommendation.process.AbstractBasedSimilarity;
-import org.apache.sdap.mudrod.recommendation.process.VariableBasedSimilarity;
+import org.apache.sdap.mudrod.recommendation.process.FeatureBasedSimilarity;
 import org.apache.sdap.mudrod.recommendation.process.SessionBasedCF;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,7 +26,7 @@ public RecommendEngine(Properties props, ESDriver es, SparkDriver spark) {
 
   @Override
   public void preprocess() {
-    LOG.info("*****************Recommendation preprocessing starts******************");
+    LOG.info("Recommendation preprocessing starts.");
 
     startTime = System.currentTimeMillis();
 
@@ -39,25 +39,25 @@ public void preprocess() {
     DiscoveryStepAbstract sessionMatrixGen = new SessionCooccurence(this.props, this.es, this.spark);
     sessionMatrixGen.execute();
 
-    DiscoveryStepAbstract transformer = new NormalizeVariables(this.props, this.es, this.spark);
+    DiscoveryStepAbstract transformer = new NormalizeFeatures(this.props, this.es, this.spark);
     transformer.execute();
 
     endTime = System.currentTimeMillis();
 
-    LOG.info("*****************Recommendation preprocessing  ends******************Took {}s {}", (endTime - startTime) / 1000);
+    LOG.info("Recommendation preprocessing ends. Took {}s {}", (endTime - startTime) / 1000);
   }
 
   @Override
   public void process() {
     // TODO Auto-generated method stub
-    LOG.info("*****************Recommendation processing starts******************");
+    LOG.info("Recommendation processing starts.");
 
     startTime = System.currentTimeMillis();
 
     DiscoveryStepAbstract tfCF = new AbstractBasedSimilarity(this.props, this.es, this.spark);
     tfCF.execute();
 
-    DiscoveryStepAbstract cbCF = new VariableBasedSimilarity(this.props, this.es, this.spark);
+    DiscoveryStepAbstract cbCF = new FeatureBasedSimilarity(this.props, this.es, this.spark);
     cbCF.execute();
 
     DiscoveryStepAbstract sbCF = new SessionBasedCF(this.props, this.es, this.spark);
@@ -65,12 +65,11 @@ public void process() {
 
     endTime = System.currentTimeMillis();
 
-    LOG.info("*****************Recommendation processing ends******************Took {}s {}", (endTime - startTime) / 1000);
+    LOG.info("Recommendation processing ends. Took {}s {}", (endTime - startTime) / 1000);
   }
 
   @Override
   public void output() {
-    // TODO Auto-generated method stub
 
   }
 
diff --git a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
index 54b9128..6a751f2 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
@@ -177,6 +177,12 @@ public String customAnalyzing(String indexName, String analyzer, String str) thr
   }
 
   public void deleteAllByQuery(String index, String type, QueryBuilder query) {
+    ImmutableOpenMap<String, MappingMetaData> mappings = getClient().admin().cluster().prepareState().execute().actionGet()
+        .getState().metaData().index(index).getMappings();
+    
+    //check if the type exists
+    if (!mappings.containsKey(type)) return;
+    
     createBulkProcessor();
     SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new TimeValue(60000)).setQuery(query).setSize(10000).execute()
         .actionGet();
diff --git a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
index edb97ca..c8829dd 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/integration/LinkageIntegration.java
@@ -20,6 +20,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
@@ -42,7 +43,7 @@
   private static final long serialVersionUID = 1L;
   transient List<LinkedTerm> termList = new ArrayList<>();
   DecimalFormat df = new DecimalFormat("#.00");
-  private static final String INDEX_NAME = "indexName";
+  private static final String INDEX_NAME = MudrodConstants.ES_INDEX_NAME;
   private static final String WEIGHT = "weight";
 
   public LinkageIntegration(Properties props, ESDriver es, SparkDriver spark) {
@@ -108,13 +109,10 @@ public Object execute(Object o) {
       }
 
       double finalWeight = tmp + ((sumModelWeight - 2) * 0.05);
-      if (finalWeight < 0) {
-        finalWeight = 0;
-      }
-
-      if (finalWeight > 1) {
-        finalWeight = 1;
-      }
+      
+      if (finalWeight < 0) finalWeight = 0;
+      if (finalWeight > 1) finalWeight = 1;
+      
       termsMap.put(entry.getKey(), Double.parseDouble(df.format(finalWeight)));
     }
 
@@ -138,7 +136,7 @@ public String getIngeratedList(String input, int num) {
       }
       count++;
     }
-    LOG.info("\n************************Integrated results***************************");
+    LOG.info("Integrated results:");
     LOG.info(output);
     return output;
   }
@@ -173,32 +171,32 @@ public JsonObject getIngeratedListInJson(String input) {
    * the similarities from different sources
    */
   public Map<String, List<LinkedTerm>> aggregateRelatedTermsFromAllmodel(String input) {
-    aggregateRelatedTerms(input, props.getProperty("userHistoryLinkageType"));
-    aggregateRelatedTerms(input, props.getProperty("clickStreamLinkageType"));
-    aggregateRelatedTerms(input, props.getProperty("metadataLinkageType"));
-    aggregateRelatedTermsSWEET(input, props.getProperty("ontologyLinkageType"));
+    aggregateRelatedTerms(input, MudrodConstants.USE_HISTORY_LINKAGE_TYPE);
+    aggregateRelatedTerms(input, MudrodConstants.CLICK_STREAM_LINKAGE_TYPE);
+    aggregateRelatedTerms(input, MudrodConstants.METADATA_LINKAGE_TYPE);
+    aggregateRelatedTermsSWEET(input, MudrodConstants.ONTOLOGY_LINKAGE_TYPE);
 
     return termList.stream().collect(Collectors.groupingBy(w -> w.term));
   }
 
   public int getModelweight(String model) {
-    if (model.equals(props.getProperty("userHistoryLinkageType"))) {
-      return Integer.parseInt(props.getProperty("userHistory_w"));
+    if (model.equals(MudrodConstants.USE_HISTORY_LINKAGE_TYPE)) {
+      return Integer.parseInt(props.getProperty(MudrodConstants.USER_HISTORY_W));
     }
 
-    if (model.equals(props.getProperty("clickStreamLinkageType"))) {
-      return Integer.parseInt(props.getProperty("clickStream_w"));
+    if (model.equals(MudrodConstants.CLICK_STREAM_LINKAGE_TYPE)) {
+      return Integer.parseInt(props.getProperty(MudrodConstants.CLICKSTREAM_W));
     }
 
-    if (model.equals(props.getProperty("metadataLinkageType"))) {
-      return Integer.parseInt(props.getProperty("metadata_w"));
+    if (model.equals(MudrodConstants.METADATA_LINKAGE_TYPE)) {
+      return Integer.parseInt(props.getProperty(MudrodConstants.METADATA_W));
     }
 
-    if (model.equals(props.getProperty("ontologyLinkageType"))) {
-      return Integer.parseInt(props.getProperty("ontology_w"));
+    if (model.equals(MudrodConstants.ONTOLOGY_LINKAGE_TYPE)) {
+      return Integer.parseInt(props.getProperty(MudrodConstants.ONTOLOGY_W));
     }
 
-    return 999999;
+    return Integer.MAX_VALUE;
   }
 
   /**
@@ -246,7 +244,7 @@ public void aggregateRelatedTerms(String input, String model) {
   public void aggregateRelatedTermsSWEET(String input, String model) {
     SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC)
         .setSize(11).execute().actionGet();
-    LOG.info("\n************************ {} results***************************", model);
+    LOG.info("{} results:", model);
     for (SearchHit hit : usrhis.getHits().getHits()) {
       Map<String, Object> result = hit.getSource();
       String conceptB = (String) result.get("concept_B");
diff --git a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
index d9435d9..0c8bcc2 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java
@@ -13,8 +13,6 @@
  */
 package org.apache.sdap.mudrod.main;
 
-import org.apache.sdap.mudrod.ontology.Ontology;
-
 /**
  * Class contains static constant keys and values relating to Mudrod
  * configuration properties. Property values are read from <a href=
@@ -22,54 +20,66 @@
  */
 public interface MudrodConstants {
 
-  public static final String CLEANUP_TYPE_PREFIX = "Cleanup_type_prefix";
-
-  public static final String CLICK_STREAM_LINKAGE_TYPE = "clickStreamLinkageType";
+  public static final String CLEANUP_TYPE = "cleanup.log";
 
-  public static final String CLICK_STREAM_MATRIX_TYPE = "clickStreamMatrixType";
+  public static final String CLICK_STREAM_LINKAGE_TYPE = "click.stream.linkage";
 
-  public static final String CLICKSTREAM_SVD_DIM = "clickstreamSVDDimension";
+  public static final String CLICK_STREAM_MATRIX_TYPE = "click.stream.matrix";
 
-  public static final String CLICKSTREAM_W = "clickStream_w";
+  public static final String CLICKSTREAM_SVD_DIM = "mudrod.clickstream.svd.d";
 
-  public static final String COMMENT_TYPE = "commentType";
+  public static final String CLICKSTREAM_W = "mudrod.clickstream.weight";
+  
+  public static final String CLICKSTREAM_PATH = "mudrod.clickstream.path";
+  
+  public static final String CLICKSTREAM_SVD_PATH = "mudrod.clickstream.svd.path";
 
   /** Defined on CLI */
   public static final String DATA_DIR = "dataDir";
 
-  public static final String DOWNLOAD_F = "downloadf";
-
-  public static final String DOWNLOAD_WEIGHT = "downloadWeight";
+  public static final String DOWNLOAD_WEIGHT = "mudrod.download.weight";
 
-  public static final String ES_CLUSTER = "clusterName";
+  public static final String ES_CLUSTER = "mudrod.cluster.name";
 
-  public static final String ES_TRANSPORT_TCP_PORT = "ES_Transport_TCP_Port";
+  public static final String ES_TRANSPORT_TCP_PORT = "mudrod.es.transport.tcp.port";
 
-  public static final String ES_UNICAST_HOSTS = "ES_unicast_hosts";
+  public static final String ES_UNICAST_HOSTS = "mudrod.es.unicast.hosts";
 
-  public static final String ES_HTTP_PORT = "ES_HTTP_port";
+  public static final String ES_HTTP_PORT = "mudrod.es.http.port";
 
-  public static final String ES_INDEX_NAME = "indexName";
+  public static final String ES_INDEX_NAME = "mudrod.es.index";
 
-  public static final String FTP_PREFIX = "ftpPrefix";
+  public static final String FTP_PREFIX = "mudrod.ftp.prefix";
 
-  public static final String FTP_TYPE_PREFIX = "FTP_type_prefix";
+  public static final String FTP_TYPE = "raw.ftp";
+  
+  public static final String FTP_LOG = "ftp";
 
-  public static final String HTTP_PREFIX = "httpPrefix";
+  public static final String HTTP_PREFIX = "mudrod.http.prefix";
 
-  public static final String HTTP_TYPE_PREFIX = "HTTP_type_prefix";
+  public static final String HTTP_TYPE = "raw.http";
+  
+  public static final String HTTP_LOG = "http";
+  
+  public static final String BASE_URL = "mudrod.base.url";
+  
+  public static final String BLACK_LIST_REQUEST = "mudrod.black.request.list";
+  
+  public static final String BLACK_LIST_AGENT = "mudrod.black.agent.list";
 
-  public static final String LOG_INDEX = "logIndexName";
+  public static final String LOG_INDEX = "mudrod.log.index";
 
-  public static final String METADATA_LINKAGE_TYPE = "metadataLinkageType";
+  public static final String METADATA_LINKAGE_TYPE = "metadata.linkage";
+  
+  public static final String METADATA_DOWNLOAD_URL = "mudrod.metadata.download.url";
 
-  public static final String METADATA_SVD_DIM = "metadataSVDDimension";
+  public static final String METADATA_SVD_DIM = "mudrod.metadata.svd.d";
 
-  public static final String METADATA_URL = "metadataurl";
+  public static final String METADATA_URL = "mudrod.metadata.url";
 
-  public static final String METADATA_W = "metadata_w";
+  public static final String METADATA_W = "mudrod.metadata.weight";
 
-  public static final String MINI_USER_HISTORY = "mini_userHistory";
+  public static final String QUERY_MIN = "mudrod.query.min";
 
   public static final String MUDROD = "mudrod";
 
@@ -80,44 +90,82 @@
    */
   public static final String ONTOLOGY_IMPL = MUDROD + "ontology.implementation";
 
-  public static final String ONTOLOGY_LINKAGE_TYPE = "ontologyLinkageType";
+  public static final String ONTOLOGY_LINKAGE_TYPE = "ontology.linkage";
 
-  public static final String ONTOLOGY_W = "ontology_w";
+  public static final String ONTOLOGY_W = "mudrod.ontology.weight";
+  
+  public static final String ONTOLOGY_PATH = "mudrod.ontology.path";
+  
+  public static final String ONTOLOGY_INPUT_PATH = "mudrod.ontology.input.path";
 
-  public static final String PROCESS_TYPE = "processingType";
+  public static final String PROCESS_TYPE = "mudrod.processing.type";
 
   /** Defined on CLI */
-  public static final String RAW_METADATA_PATH = "raw_metadataPath";
-
-  public static final String RAW_METADATA_TYPE = "raw_metadataType";
-
-  public static final String SEARCH_F = "searchf";
-
-  public static final String SENDING_RATE = "sendingrate";
-
-  public static final String SESSION_PORT = "SessionPort";
-
-  public static final String SESSION_STATS_PREFIX = "SessionStats_prefix";
-
-  public static final String SESSION_URL = "SessionUrl";
-
-  public static final String SPARK_APP_NAME = "spark.app.name";
-
-  public static final String SPARK_MASTER = "spark.master";
+  public static final String METADATA_DOWNLOAD = "mudrod.metadata.download";
+  
+  public static final String RAW_METADATA_PATH = "mudrod.metadata.path";
+
+  public static final String RAW_METADATA_TYPE = "mudrod.metadata.type";
+  
+  public static final String METADATA_MATRIX_PATH = "mudrod.metadata.matrix.path";
+  
+  public static final String METADATA_SVD_PATH = "mudrod.metadata.svd.path";
+  
+  public static final String RECOM_METADATA_TYPE = "recommedation.metadata";
+  
+  public static final String METADATA_ID = "mudrod.metadata.id";
+  
+  public static final String SEMANTIC_FIELDS = "mudrod.metadata.semantic.fields";
+  
+  public static final String METADATA_WORD_SIM_TYPE = "metadata.word.sim";
+  
+  public static final String METADATA_FEATURE_SIM_TYPE = "metadata.feature.sim";
+  
+  public static final String METADATA_SESSION_SIM_TYPE = "metadata.session.sim";
+  
+  public static final String METADATA_TERM_MATRIX_PATH = "metadata.term.matrix.path";
+  
+  public static final String METADATA_WORD_MATRIX_PATH = "metadata.word.matrix.path";
+  
+  public static final String METADATA_SESSION_MATRIX_PATH = "metadata.session.matrix.path";
+
+  public static final String REQUEST_RATE = "mudrod.request.rate";
+
+  public static final String SESSION_PORT = "mudrod.session.port";
+
+  public static final String SESSION_STATS_TYPE = "session.stats";
+
+  public static final String SESSION_URL = "mudrod.session.url";
+
+  public static final String SPARK_APP_NAME = "mudrod.spark.app.name";
+
+  public static final String SPARK_MASTER = "mudrod.spark.master";
   /**
    * Absolute local location of javaSVMWithSGDModel directory. This is typically
    * <code>file:///usr/local/mudrod/core/src/main/resources/javaSVMWithSGDModel</code>
    */
-  public static final String SVM_SGD_MODEL = "svmSgdModel";
+  public static final String RANKING_MODEL = "mudrod.ranking.model";
+  
+  public static final String RANKING_ML = "mudrod.ranking.machine.learning";
 
-  public static final String TIMEGAP = "timegap";
+  public static final String REQUEST_TIME_GAP = "mudrod.request.time.gap";
 
-  public static final String TIME_SUFFIX = "TimeSuffix";
+  public static final String TIME_SUFFIX = "time.suffix";
 
-  public static final String USE_HISTORY_LINKAGE_TYPE = "userHistoryLinkageType";
+  public static final String USE_HISTORY_LINKAGE_TYPE = "user.history.linkage";
 
-  public static final String USER_HISTORY_W = "userHistory_w";
+  public static final String USER_HISTORY_W = "mudrod.user.history.weight";
+  
+  public static final String USER_HISTORY_PATH = "mudrod.user.history.path";
 
-  public static final String VIEW_F = "viewf";
+  public static final String VIEW_F = "mudrod.view.freq";
+  
+  public static final String VIEW_MARKER = "mudrod.view.url.marker";
+  
+  public static final String SEARCH_MARKER = "mudrod.search.url.marker";
+  
+  public static final String SEARCH_F = "mudrod.search.freq";
+  
+  public static final String DOWNLOAD_F = "mudrod.download.freq";
 
 }
diff --git a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodEngine.java b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodEngine.java
index 359ae71..6784f04 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodEngine.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodEngine.java
@@ -138,48 +138,37 @@ private InputStream locateConfig() {
       LOG.info("Loaded config file from " + configFile.getAbsolutePath());
       return configStream;
     } catch (IOException e) {
-      LOG.info("File specified by environment variable " + MudrodConstants.MUDROD_CONFIG + "=\'" + configLocation + "\' could not be loaded. " + e.getMessage());
+      LOG.info("File specified by environment variable {} = '{}' could not be loaded. Default configuration will be used.", MudrodConstants.MUDROD_CONFIG, configLocation, e.getMessage());   
     }
 
-    InputStream configStream = MudrodEngine.class.getClassLoader().getResourceAsStream("config.xml");
+    InputStream configStream = MudrodEngine.class.getClassLoader().getResourceAsStream("config.properties");
 
     if (configStream != null) {
-      LOG.info("Loaded config file from {}", MudrodEngine.class.getClassLoader().getResource("config.xml").getPath());
+      LOG.info("Loaded config file from {}", MudrodEngine.class.getClassLoader().getResource("config.properties").getPath());
     }
 
     return configStream;
   }
 
   /**
-   * Load the configuration provided at <a href=
-   * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>.
+   * Load the configuration provided at resources/config.properties.
    *
    * @return a populated {@link java.util.Properties} object.
    */
   public Properties loadConfig() {
-    SAXBuilder saxBuilder = new SAXBuilder();
-
     InputStream configStream = locateConfig();
-
-    Document document;
     try {
-      document = saxBuilder.build(configStream);
-      Element rootNode = document.getRootElement();
-      List<Element> paraList = rootNode.getChildren("para");
-
-      for (Element paraNode : paraList) {
-        String attributeName = paraNode.getAttributeValue("name");
-        if (MudrodConstants.SVM_SGD_MODEL.equals(attributeName)) {
-          props.put(attributeName, decompressSVMWithSGDModel(paraNode.getTextTrim()));
-        } else {
-          props.put(attributeName, paraNode.getTextTrim());
-        }
+      props.load(configStream);
+      for(String key : props.stringPropertyNames()) {
+        props.put(key, props.getProperty(key).trim());
       }
-    } catch (JDOMException | IOException e) {
-      LOG.error("Exception whilst retrieving or processing XML contained within 'config.xml'!", e);
+      String rankingModelPath = props.getProperty(MudrodConstants.RANKING_MODEL);
+      props.put(MudrodConstants.RANKING_MODEL, decompressSVMWithSGDModel(rankingModelPath));
+    } catch (IOException e) {
+      LOG.info("Fail to load the sytem config file");
     }
+    
     return getConfig();
-
   }
 
   private String decompressSVMWithSGDModel(String archiveName) throws IOException {
@@ -232,7 +221,7 @@ public void startLogIngest() {
     DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark);
     wd.preprocess();
     wd.process();
-    LOG.info("*****************logs have been ingested successfully******************");
+    LOG.info("Logs have been ingested successfully");
   }
 
   /**
@@ -383,7 +372,7 @@ public static void main(String[] args) {
 
       me.es = new ESDriver(me.getConfig());
       me.spark = new SparkDriver(me.getConfig());
-      loadFullConfig(me, dataDir);
+      loadPathConfig(me, dataDir);
       if (processingType != null) {
         switch (processingType) {
         case PROCESSING:
@@ -410,27 +399,19 @@ public static void main(String[] args) {
     }
   }
 
-  private static void loadFullConfig(MudrodEngine me, String dataDir) {
-    //TODO all of the properties defined below, which are determined are
-    //runtime need to be added to MudrodConstants.java and referenced 
-    //accordingly and consistently from Properties.getProperty(MudrodConstant...);
-    me.props.put("ontologyInputDir", dataDir + "SWEET_ocean/");
-    me.props.put("oceanTriples", dataDir + "Ocean_triples.csv");
-    me.props.put("userHistoryMatrix", dataDir + "UserHistoryMatrix.csv");
-    me.props.put("clickstreamMatrix", dataDir + "ClickstreamMatrix.csv");
-    me.props.put("metadataMatrix", dataDir + "MetadataMatrix.csv");
-    me.props.put("clickstreamSVDMatrix_tmp", dataDir + "clickstreamSVDMatrix_tmp.csv");
-    me.props.put("metadataSVDMatrix_tmp", dataDir + "metadataSVDMatrix_tmp.csv");
-    me.props.put("raw_metadataPath", dataDir + me.props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
-
-    me.props.put("jtopia", dataDir + "jtopiaModel");
-    me.props.put("metadata_term_tfidf_matrix", dataDir + "metadata_term_tfidf.csv");
-    me.props.put("metadata_word_tfidf_matrix", dataDir + "metadata_word_tfidf.csv");
-    me.props.put("session_metadata_Matrix", dataDir + "metadata_session_coocurrence_matrix.csv");
-
-    me.props.put("metadataOBCode", dataDir + "MetadataOHCode");
-    me.props.put("metadata_topic", dataDir + "metadata_topic");
-    me.props.put("metadata_topic_matrix", dataDir + "metadata_topic_matrix.csv");
+  private static void loadPathConfig(MudrodEngine me, String dataDir) {
+    me.props.put(MudrodConstants.ONTOLOGY_INPUT_PATH, dataDir + "SWEET_ocean/");
+    me.props.put(MudrodConstants.ONTOLOGY_PATH, dataDir + "ocean_triples.csv");
+    me.props.put(MudrodConstants.USER_HISTORY_PATH, dataDir + "userhistorymatrix.csv");
+    me.props.put(MudrodConstants.CLICKSTREAM_PATH, dataDir + "clickstreammatrix.csv");
+    me.props.put(MudrodConstants.METADATA_MATRIX_PATH, dataDir + "metadatamatrix.csv");
+    me.props.put(MudrodConstants.CLICKSTREAM_SVD_PATH, dataDir + "clickstreamsvdmatrix_tmp.csv");
+    me.props.put(MudrodConstants.METADATA_SVD_PATH, dataDir + "metadatasvdMatrix_tmp.csv");
+    me.props.put(MudrodConstants.RAW_METADATA_PATH, dataDir + me.props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
+
+    me.props.put(MudrodConstants.METADATA_TERM_MATRIX_PATH, dataDir + "metadata_term_tfidf.csv");
+    me.props.put(MudrodConstants.METADATA_WORD_MATRIX_PATH, dataDir + "metadata_word_tfidf.csv");
+    me.props.put(MudrodConstants.METADATA_SESSION_MATRIX_PATH, dataDir + "metadata_session_coocurrence_matrix.csv");
   }
 
   /**
diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
index 32b4609..b74074a 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/ApiHarvester.java
@@ -57,7 +57,8 @@ public Object execute() {
     //remove old metadata from ES
     es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
     //harvest new metadata using PO.DAAC web services
-    harvestMetadatafromWeb();
+    if("1".equals(props.getProperty(MudrodConstants.METADATA_DOWNLOAD))) 
+      harvestMetadatafromWeb();
     es.createBulkProcessor();
     addMetadataMapping();
     importToES();
@@ -125,7 +126,8 @@ private void harvestMetadatafromWeb() {
     int doc_length = 0;
     JsonParser parser = new JsonParser();
     do {
-      String searchAPI = "https://podaac.jpl.nasa.gov/api/dataset?startIndex=" + Integer.toString(startIndex) + "&entries=10&sortField=Dataset-AllTimePopularity&sortOrder=asc&id=&value=&search=";
+      String searchAPI = props.getProperty(MudrodConstants.METADATA_DOWNLOAD_URL);
+      searchAPI = searchAPI.replace("$startIndex", Integer.toString(startIndex));
       HttpRequest http = new HttpRequest();
       String response = http.getRequest(searchAPI);
 
diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
index 63565b2..e4a6320 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/pre/MatrixGenerator.java
@@ -61,7 +61,7 @@ public Object execute() {
     LOG.info("Metadata matrix started");
     startTime = System.currentTimeMillis();
 
-    String metadataMatrixFile = props.getProperty("metadataMatrix");
+    String metadataMatrixFile = props.getProperty(MudrodConstants.METADATA_MATRIX_PATH);
     try {
       MetadataExtractor extractor = new MetadataExtractor();
       JavaPairRDD<String, List<String>> metadataTermsRDD = extractor.loadMetadata(this.es, this.spark.sc, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
index 80e23c1..303eb52 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/process/MetadataAnalyzer.java
@@ -16,6 +16,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.apache.sdap.mudrod.semantics.SVDAnalyzer;
 import org.apache.sdap.mudrod.utils.LinkageTriple;
 import org.slf4j.Logger;
@@ -63,18 +64,18 @@ public Object execute(Object o) {
   @Override
   public Object execute() {
     try {
-      LOG.info("*****************Metadata Analyzer starts******************");
+      LOG.info("Metadata Analyzer starts.");
       startTime = System.currentTimeMillis();
 
       SVDAnalyzer analyzer = new SVDAnalyzer(props, es, spark);
-      int svdDimension = Integer.parseInt(props.getProperty("metadataSVDDimension"));
-      String metadataMatrixFile = props.getProperty("metadataMatrix");
-      String svdMatrixFileName = props.getProperty("metadataSVDMatrix_tmp");
+      int svdDimension = Integer.parseInt(props.getProperty(MudrodConstants.METADATA_SVD_DIM));
+      String metadataMatrixFile = props.getProperty(MudrodConstants.METADATA_MATRIX_PATH);
+      String svdMatrixFileName = props.getProperty(MudrodConstants.METADATA_SVD_PATH);
 
       analyzer.getSVDMatrix(metadataMatrixFile, svdDimension, svdMatrixFileName);
       List<LinkageTriple> triples = analyzer.calTermSimfromMatrix(svdMatrixFileName);
 
-      analyzer.saveToES(triples, props.getProperty("indexName"), props.getProperty("metadataLinkageType"));
+      analyzer.saveToES(triples, props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.METADATA_LINKAGE_TYPE);
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -82,7 +83,7 @@ public Object execute() {
 
     endTime = System.currentTimeMillis();
     es.refreshIndex();
-    LOG.info("*****************Metadata Analyzer ends******************Took {}s", (endTime - startTime) / 1000);
+    LOG.info("Metadata Analyzer ends. Took {}s", (endTime - startTime) / 1000);
     return null;
   }
 }
diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/Metadata.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/Metadata.java
new file mode 100644
index 0000000..c420c94
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/Metadata.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.metadata.structure;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.sdap.mudrod.driver.ESDriver;
+
+public abstract class Metadata implements Serializable  {
+
+  private static final long serialVersionUID = 1L;
+  protected String shortname;
+  
+  public Metadata() {
+  }
+
+  public Metadata(String shortname) {
+    this.shortname = shortname;
+  }
+
+  /**
+   * getShortName:get short name of data set
+   *
+   * @return data set short name
+   */
+  public String getShortName() {
+    return this.shortname;
+  }
+
+  /**
+   * getAbstract:get abstract of data set
+   *
+   * @return data set abstract
+   */
+  public abstract List<String> getAllTermList();
+}
diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/MetadataExtractor.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/MetadataExtractor.java
index 379d5b9..5f84206 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/MetadataExtractor.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/MetadataExtractor.java
@@ -54,7 +54,7 @@ public MetadataExtractor() {
    * list extracted from metadata variables.
    */
   public JavaPairRDD<String, List<String>> loadMetadata(ESDriver es, JavaSparkContext sc, String index, String type) {
-    List<PODAACMetadata> metadatas = this.loadMetadataFromES(es, index, type);
+    List<Metadata> metadatas = this.loadMetadataFromES(es, index, type);
     JavaPairRDD<String, List<String>> metadataTermsRDD = this.buildMetadataRDD(es, sc, index, metadatas);
     return metadataTermsRDD;
   }
@@ -67,31 +67,15 @@ public MetadataExtractor() {
    * @param type  metadata type name
    * @return metadata list
    */
-  protected List<PODAACMetadata> loadMetadataFromES(ESDriver es, String index, String type) {
+  protected List<Metadata> loadMetadataFromES(ESDriver es, String index, String type) {
 
-    List<PODAACMetadata> metadatas = new ArrayList<PODAACMetadata>();
+    List<Metadata> metadatas = new ArrayList<>();
     SearchResponse scrollResp = es.getClient().prepareSearch(index).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setScroll(new TimeValue(60000)).setSize(100).execute().actionGet();
 
     while (true) {
       for (SearchHit hit : scrollResp.getHits().getHits()) {
         Map<String, Object> result = hit.getSource();
-        String shortname = (String) result.get("Dataset-ShortName");
-        List<String> topic = (List<String>) result.get("DatasetParameter-Topic");
-        List<String> term = (List<String>) result.get("DatasetParameter-Term");
-        List<String> keyword = (List<String>) result.get("Dataset-Metadata");
-        List<String> variable = (List<String>) result.get("DatasetParameter-Variable");
-        List<String> longname = (List<String>) result.get("DatasetProject-Project-LongName");
-
-        List<String> region = (List<String>) result.get("DatasetRegion-Region");
-
-        PODAACMetadata metadata = null;
-        try {
-          metadata = new PODAACMetadata(shortname, longname, es.customAnalyzing(index, topic), es.customAnalyzing(index, term), es.customAnalyzing(index, variable), es.customAnalyzing(index, keyword),
-              es.customAnalyzing(index, region));
-        } catch (InterruptedException | ExecutionException e) {
-          e.printStackTrace();
-
-        }
+        Metadata metadata = new PODAACMetadata(result, es, index);
         metadatas.add(metadata);
       }
       scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
@@ -113,16 +97,16 @@ public MetadataExtractor() {
    * @return PairRDD, in each pair key is metadata short name and value is term
    * list extracted from metadata variables.
    */
-  protected JavaPairRDD<String, List<String>> buildMetadataRDD(ESDriver es, JavaSparkContext sc, String index, List<PODAACMetadata> metadatas) {
-    JavaRDD<PODAACMetadata> metadataRDD = sc.parallelize(metadatas);
-    JavaPairRDD<String, List<String>> metadataTermsRDD = metadataRDD.mapToPair(new PairFunction<PODAACMetadata, String, List<String>>() {
+  protected JavaPairRDD<String, List<String>> buildMetadataRDD(ESDriver es, JavaSparkContext sc, String index, List<Metadata> metadatas) {
+    JavaRDD<Metadata> metadataRDD = sc.parallelize(metadatas);
+    JavaPairRDD<String, List<String>> metadataTermsRDD = metadataRDD.mapToPair(new PairFunction<Metadata, String, List<String>>() {
       /**
        *
        */
       private static final long serialVersionUID = 1L;
 
       @Override
-      public Tuple2<String, List<String>> call(PODAACMetadata metadata) throws Exception {
+      public Tuple2<String, List<String>> call(Metadata metadata) throws Exception {
         return new Tuple2<String, List<String>>(metadata.getShortName(), metadata.getAllTermList());
       }
     }).reduceByKey(new Function2<List<String>, List<String>, List<String>>() {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/PODAACMetadata.java b/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/PODAACMetadata.java
index 4c3070b..a0c91d1 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/PODAACMetadata.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/metadata/structure/PODAACMetadata.java
@@ -13,90 +13,124 @@
  */
 package org.apache.sdap.mudrod.metadata.structure;
 
-import java.io.Serializable;
+import org.apache.sdap.mudrod.driver.ESDriver;
+
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * ClassName: PODAACMetadata Function: PODAACMetadata setter and getter methods
  */
-public class PODAACMetadata implements Serializable {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  // shortname: data set short name
-  private String shortname;
-  // abstractStr: data set abstract
-  private String abstractStr;
-  // isoTopic: data set topic
-  private String isoTopic;
-  // sensor: sensor
-  private String sensor;
-  // source: data source
-  private String source;
-  // project: data project
-  private String project;
-  // hasAbstarct: whether data set has abstract
-  boolean hasAbstarct;
-
-  // longnameList: data set long name list
-  private List<String> longnameList;
-  // keywordList:data set key word list
-  private List<String> keywordList;
-  // termList: data set term list
-  private List<String> termList;
-  // topicList: data set topic list
-  private List<String> topicList;
-  // variableList: data set variable list
-  private List<String> variableList;
-  // abstractList: data set abstract term list
-  private List<String> abstractList;
-  // isotopicList: data set iso topic list
-  private List<String> isotopicList;
-  // sensorList: data set sensor list
-  private List<String> sensorList;
-  // sourceList: data set source list
-  private List<String> sourceList;
-  // projectList: data set project list
-  private List<String> projectList;
-  // regionList: data set region list
-  private List<String> regionList;
-
-  public PODAACMetadata() {
-    // Default constructor
-  }
-
-  /**
-   * Creates a new instance of PODAACMetadata.
-   *
-   * @param shortname data set short name
-   * @param longname  data set long name
-   * @param topics    data set topics
-   * @param terms     data set terms
-   * @param variables data set variables
-   * @param keywords  data set keywords
-   * @param region    list of regions
-   */
-  public PODAACMetadata(String shortname, List<String> longname, List<String> topics, List<String> terms, List<String> variables, List<String> keywords, List<String> region) {
-    this.shortname = shortname;
-    this.longnameList = longname;
-    this.keywordList = keywords;
-    this.termList = terms;
-    this.topicList = topics;
-    this.variableList = variables;
-    this.regionList = region;
-  }
-
-  /**
-   * setTerms: set term of data set
-   *
-   * @param termstr data set terms
-   */
-  public void setTerms(String termstr) {
-    this.splitString(termstr, this.termList);
-  }
+public class PODAACMetadata extends Metadata {
+
+	/**
+	 *
+	 */
+	private static final long serialVersionUID = 1L;
+	// shortname: data set short name
+	// private String shortname;
+	// abstractStr: data set abstract
+	private String abstractStr;
+	// isoTopic: data set topic
+	private String isoTopic;
+	// sensor: sensor
+	private String sensor;
+	// source: data source
+	private String source;
+	// project: data project
+	private String project;
+	// hasAbstarct: whether data set has abstract
+	boolean hasAbstarct;
+
+	// longnameList: data set long name list
+	private List<String> longnameList;
+	// keywordList:data set key word list
+	private List<String> keywordList;
+	// termList: data set term list
+	private List<String> termList;
+	// topicList: data set topic list
+	private List<String> topicList;
+	// variableList: data set variable list
+	private List<String> variableList;
+	// abstractList: data set abstract term list
+	private List<String> abstractList;
+	// isotopicList: data set iso topic list
+	private List<String> isotopicList;
+	// sensorList: data set sensor list
+	private List<String> sensorList;
+	// sourceList: data set source list
+	private List<String> sourceList;
+	// projectList: data set project list
+	private List<String> projectList;
+	// regionList: data set region list
+	private List<String> regionList;
+
+	public PODAACMetadata() {
+		// Default constructor
+	}
+
+	/**
+	 * Creates a new instance of PODAACMetadata.
+	 *
+	 * @param shortname
+	 *            data set short name
+	 * @param longname
+	 *            data set long name
+	 * @param topics
+	 *            data set topics
+	 * @param terms
+	 *            data set terms
+	 * @param variables
+	 *            data set variables
+	 * @param keywords
+	 *            data set keywords
+	 * @param region
+	 *            list of regions
+	 */
+	public PODAACMetadata(String shortname, List<String> longname, List<String> topics, List<String> terms,
+			List<String> variables, List<String> keywords, List<String> region) {
+		this.shortname = shortname;
+		this.longnameList = longname;
+		this.keywordList = keywords;
+		this.termList = terms;
+		this.topicList = topics;
+		this.variableList = variables;
+		this.regionList = region;
+	}
+
+	public PODAACMetadata(Map<String, Object> result, ESDriver es, String index) {
+
+		String shortname = (String) result.get("Dataset-ShortName");
+		List<String> topic = (List<String>) result.get("DatasetParameter-Topic");
+		List<String> term = (List<String>) result.get("DatasetParameter-Term");
+		List<String> keyword = (List<String>) result.get("Dataset-Metadata");
+		List<String> variable = (List<String>) result.get("DatasetParameter-Variable");
+		List<String> longname = (List<String>) result.get("DatasetProject-Project-LongName");
+		List<String> region = (List<String>) result.get("DatasetRegion-Region");
+
+		this.shortname = shortname;
+		this.longnameList = longname;
+		try {
+			this.keywordList = es.customAnalyzing(index, keyword);
+			this.termList = es.customAnalyzing(index, term);
+			this.topicList = es.customAnalyzing(index, topic);
+			this.variableList = es.customAnalyzing(index, variable);
+			this.regionList = es.customAnalyzing(index, region);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * setTerms: set term of data set
+	 *
+	 * @param termstr
+	 *            data set terms
+	 */
+	public void setTerms(String termstr) {
+		this.splitString(termstr, this.termList);
+	}
 
   /**
    * setKeywords: set key word of data set
diff --git a/core/src/main/java/org/apache/sdap/mudrod/ontology/pre/AggregateTriples.java b/core/src/main/java/org/apache/sdap/mudrod/ontology/pre/AggregateTriples.java
index e988d15..f1a9c04 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/ontology/pre/AggregateTriples.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/ontology/pre/AggregateTriples.java
@@ -17,6 +17,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.jdom2.Document;
 import org.jdom2.Element;
 import org.jdom2.JDOMException;
@@ -51,7 +52,7 @@ public AggregateTriples(Properties props, ESDriver es, SparkDriver spark) {
    */
   @Override
   public Object execute() {
-    File file = new File(this.props.getProperty("oceanTriples"));
+    File file = new File(this.props.getProperty(MudrodConstants.ONTOLOGY_PATH));
     if (file.exists()) {
       file.delete();
     }
@@ -69,7 +70,7 @@ public Object execute() {
       e.printStackTrace();
     }
 
-    File[] files = new File(this.props.getProperty("ontologyInputDir")).listFiles();
+    File[] files = new File(this.props.getProperty(MudrodConstants.ONTOLOGY_INPUT_PATH)).listFiles();
     for (File file_in : files) {
       String ext = FilenameUtils.getExtension(file_in.getAbsolutePath());
       if ("owl".equals(ext)) {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyLinkCal.java b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyLinkCal.java
index eb6aeff..0c5301d 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyLinkCal.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/ontology/process/OntologyLinkCal.java
@@ -16,6 +16,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -35,7 +36,7 @@
 
   public OntologyLinkCal(Properties props, ESDriver es, SparkDriver spark) {
     super(props, es, spark);
-    es.deleteAllByQuery(props.getProperty("indexName"), props.getProperty("ontologyLinkageType"), QueryBuilders.matchAllQuery());
+    es.deleteAllByQuery(props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.ONTOLOGY_LINKAGE_TYPE, QueryBuilders.matchAllQuery());
     addSWEETMapping();
   }
 
@@ -45,12 +46,13 @@ public OntologyLinkCal(Properties props, ESDriver es, SparkDriver spark) {
   public void addSWEETMapping() {
     XContentBuilder Mapping;
     try {
-      Mapping = jsonBuilder().startObject().startObject(props.getProperty("ontologyLinkageType")).startObject("properties").startObject("concept_A").field("type", "string")
+      Mapping = jsonBuilder().startObject().startObject(MudrodConstants.ONTOLOGY_LINKAGE_TYPE).startObject("properties").startObject("concept_A").field("type", "string")
           .field("index", "not_analyzed").endObject().startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject()
 
           .endObject().endObject().endObject();
 
-      es.getClient().admin().indices().preparePutMapping(props.getProperty("indexName")).setType(props.getProperty("ontologyLinkageType")).setSource(Mapping).execute().actionGet();
+      es.getClient().admin().indices().preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME))
+      .setType(MudrodConstants.ONTOLOGY_LINKAGE_TYPE).setSource(Mapping).execute().actionGet();
     } catch (IOException e) {
       e.printStackTrace();
     }
@@ -61,7 +63,7 @@ public void addSWEETMapping() {
    */
   @Override
   public Object execute() {
-    es.deleteType(props.getProperty("indexName"), props.getProperty("ontologyLinkageType"));
+    es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.ONTOLOGY_LINKAGE_TYPE);
     es.createBulkProcessor();
 
     BufferedReader br = null;
@@ -69,7 +71,7 @@ public Object execute() {
     double weight = 0;
 
     try {
-      br = new BufferedReader(new FileReader(props.getProperty("oceanTriples")));
+      br = new BufferedReader(new FileReader(props.getProperty(MudrodConstants.ONTOLOGY_PATH)));
       while ((line = br.readLine()) != null) {
         String[] strList = line.toLowerCase().split(",");
         if (strList[1].equals("subclassof")) {
@@ -78,9 +80,9 @@ public Object execute() {
           weight = 0.9;
         }
 
-        IndexRequest ir = new IndexRequest(props.getProperty("indexName"), props.getProperty("ontologyLinkageType")).source(
-            jsonBuilder().startObject().field("concept_A", es.customAnalyzing(props.getProperty("indexName"), strList[2]))
-                .field("concept_B", es.customAnalyzing(props.getProperty("indexName"), strList[0])).field("weight", weight).endObject());
+        IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.ONTOLOGY_LINKAGE_TYPE).source(
+            jsonBuilder().startObject().field("concept_A", es.customAnalyzing(props.getProperty(MudrodConstants.ES_INDEX_NAME), strList[2]))
+                .field("concept_B", es.customAnalyzing(props.getProperty(MudrodConstants.ES_INDEX_NAME), strList[0])).field("weight", weight).endObject());
         es.getBulkProcessor().add(ir);
 
       }
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/ImportMetadata.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/ImportMetadata.java
index 7bb1d22..b6f7df1 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/ImportMetadata.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/ImportMetadata.java
@@ -65,8 +65,12 @@ public void addMetadataMapping() {
     String mappingJson = "{\r\n   \"dynamic_templates\": " + "[\r\n      " + "{\r\n         \"strings\": " + "{\r\n            \"match_mapping_type\": \"string\","
         + "\r\n            \"mapping\": {\r\n               \"type\": \"string\"," + "\r\n               \"analyzer\": \"csv\"\r\n            }" + "\r\n         }\r\n      }\r\n   ]\r\n}";
 
-    es.getClient().admin().indices().preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType(props.getProperty("recom_metadataType")).setSource(mappingJson).execute().actionGet();
-
+    es.getClient().admin().indices().
+    preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME))
+    .setType(MudrodConstants.RECOM_METADATA_TYPE)
+    .setSource(mappingJson)
+    .execute()
+    .actionGet();
   }
 
   /**
@@ -75,7 +79,7 @@ public void addMetadataMapping() {
    * invoking this method.
    */
   private void importToES() {
-    es.deleteType(props.getProperty("indexName"), props.getProperty("recom_metadataType"));
+    es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.RECOM_METADATA_TYPE);
 
     es.createBulkProcessor();
     File directory = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH));
@@ -88,9 +92,8 @@ private void importToES() {
           String jsonTxt = IOUtils.toString(is);
           JsonParser parser = new JsonParser();
           JsonElement item = parser.parse(jsonTxt);
-          IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty("recom_metadataType")).source(item.toString());
-
-          // preprocessdata
+          IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), 
+              MudrodConstants.RECOM_METADATA_TYPE).source(item.toString());
 
           es.getBulkProcessor().add(ir);
         } catch (IOException e) {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
index eb22630..e17d2ce 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
@@ -16,7 +16,8 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
-import org.apache.sdap.mudrod.recommendation.structure.MetadataOpt;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.recommendation.structure.MetadataTokenizer;
 import org.apache.sdap.mudrod.utils.LabeledRowMatrix;
 import org.apache.sdap.mudrod.utils.MatrixUtil;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -24,6 +25,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
@@ -70,35 +72,35 @@ public Object execute(Object o) {
 
   public LabeledRowMatrix generateWordBasedTFIDF() throws Exception {
 
-    MetadataOpt opt = new MetadataOpt(props);
+    MetadataTokenizer opt = new MetadataTokenizer(props);
 
-    JavaPairRDD<String, String> metadataContents = opt.loadAll(es, spark);
+    String metadataName = props.getProperty(MudrodConstants.METADATA_ID);
+    JavaPairRDD<String, String> metadataContents = opt.loadAll(es, spark, metadataName);
 
     JavaPairRDD<String, List<String>> metadataWords = opt.tokenizeData(metadataContents, " ");
 
     LabeledRowMatrix wordtfidfMatrix = opt.tFIDFTokens(metadataWords, spark);
 
-    MatrixUtil.exportToCSV(wordtfidfMatrix.rowMatrix, wordtfidfMatrix.rowkeys, wordtfidfMatrix.colkeys, props.getProperty("metadata_word_tfidf_matrix"));
+    MatrixUtil.exportToCSV(wordtfidfMatrix.rowMatrix, wordtfidfMatrix.rowkeys, wordtfidfMatrix.colkeys, props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH));
 
     return wordtfidfMatrix;
   }
 
   public LabeledRowMatrix generateTermBasedTFIDF() throws Exception {
 
-    MetadataOpt opt = new MetadataOpt(props);
+    MetadataTokenizer opt = new MetadataTokenizer(props);
 
-    List<String> variables = new ArrayList<>();
-    variables.add("DatasetParameter-Term");
-    variables.add("DatasetParameter-Variable");
-    variables.add("Dataset-ExtractTerm");
+    String source = props.getProperty(MudrodConstants.SEMANTIC_FIELDS);
+    List<String> variables = new ArrayList<>(Arrays.asList(source.split(",")));
 
-    JavaPairRDD<String, String> metadataContents = opt.loadAll(es, spark, variables);
+    String metadataName = props.getProperty(MudrodConstants.METADATA_ID);
+    JavaPairRDD<String, String> metadataContents = opt.loadAll(es, spark, variables, metadataName);
 
     JavaPairRDD<String, List<String>> metadataTokens = opt.tokenizeData(metadataContents, ",");
 
     LabeledRowMatrix tokentfidfMatrix = opt.tFIDFTokens(metadataTokens, spark);
 
-    MatrixUtil.exportToCSV(tokentfidfMatrix.rowMatrix, tokentfidfMatrix.rowkeys, tokentfidfMatrix.colkeys, props.getProperty("metadata_term_tfidf_matrix"));
+    MatrixUtil.exportToCSV(tokentfidfMatrix.rowMatrix, tokentfidfMatrix.rowkeys, tokentfidfMatrix.colkeys, props.getProperty(MudrodConstants.METADATA_TERM_MATRIX_PATH));
 
     return tokentfidfMatrix;
   }
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeFeatures.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeFeatures.java
new file mode 100644
index 0000000..458ccbb
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeFeatures.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes the preprocessing, processing, and data structure used
+ * by recommendation module.
+ */
+package org.apache.sdap.mudrod.recommendation.pre;
+
+import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.recommendation.structure.MetadataFeature;
+import org.apache.sdap.mudrod.recommendation.structure.PODAACMetadataFeature;
+
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+public class NormalizeFeatures extends DiscoveryStepAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(NormalizeFeatures.class);
+  // index name
+  private String indexName;
+  // type name of metadata in ES
+  private String metadataType;
+
+  /**
+   * Creates a new instance of OHEncoder.
+   *
+   * @param props the Mudrod configuration
+   * @param es    an instantiated {@link ESDriver}
+   * @param spark an instantiated {@link SparkDriver}
+   */
+  public NormalizeFeatures(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+    indexName = props.getProperty(MudrodConstants.ES_INDEX_NAME);
+    metadataType = MudrodConstants.RECOM_METADATA_TYPE;
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Preprocessing metadata feature starts.");
+    startTime = System.currentTimeMillis();
+
+    normalizeMetadataVariables(es);
+
+    endTime = System.currentTimeMillis();
+    LOG.info("Preprocessing metadata feature ends. Took {}s", (endTime - startTime) / 1000);
+
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  public void normalizeMetadataVariables(ESDriver es) {
+
+    es.createBulkProcessor();
+
+    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+        .actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> metadata = hit.getSource();
+        Map<String, Object> updatedValues = new HashMap<>();
+
+        //!!!important change to other normalizer class when using other metadata
+        MetadataFeature normalizer = new PODAACMetadataFeature();
+        normalizer.normalizeMetadataVariables(metadata, updatedValues);
+
+        UpdateRequest ur = es.generateUpdateRequest(indexName, metadataType, hit.getId(), updatedValues);
+        es.getBulkProcessor().add(ur);
+      }
+
+      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    es.destroyBulkProcessor();
+  }
+}
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeVariables.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeVariables.java
deleted file mode 100644
index 28ffd5d..0000000
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/NormalizeVariables.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package org.apache.sdap.mudrod.recommendation.pre;
-
-import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
-import org.apache.sdap.mudrod.driver.ESDriver;
-import org.apache.sdap.mudrod.driver.SparkDriver;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.regex.Pattern;
-
-public class NormalizeVariables extends DiscoveryStepAbstract {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = LoggerFactory.getLogger(NormalizeVariables.class);
-  // index name
-  private String indexName;
-  // type name of metadata in ES
-  private String metadataType;
-
-  /**
-   * Creates a new instance of OHEncoder.
-   *
-   * @param props the Mudrod configuration
-   * @param es    an instantiated {@link ESDriver}
-   * @param spark an instantiated {@link SparkDriver}
-   */
-  public NormalizeVariables(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-    indexName = props.getProperty("indexName");
-    metadataType = props.getProperty("recom_metadataType");
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("*****************processing metadata variables starts******************");
-    startTime = System.currentTimeMillis();
-
-    normalizeMetadataVariables(es);
-
-    endTime = System.currentTimeMillis();
-    LOG.info("*****************processing metadata variables ends******************Took {}s", (endTime - startTime) / 1000);
-
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-  public void normalizeMetadataVariables(ESDriver es) {
-
-    es.createBulkProcessor();
-
-    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
-        .actionGet();
-    while (true) {
-      for (SearchHit hit : scrollResp.getHits().getHits()) {
-        Map<String, Object> metadata = hit.getSource();
-        Map<String, Object> updatedValues = new HashMap<>();
-
-        this.normalizeSpatialVariables(metadata, updatedValues);
-        this.normalizeTemporalVariables(metadata, updatedValues);
-        this.normalizeOtherVariables(metadata, updatedValues);
-
-        UpdateRequest ur = es.generateUpdateRequest(indexName, metadataType, hit.getId(), updatedValues);
-        es.getBulkProcessor().add(ur);
-      }
-
-      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
-      if (scrollResp.getHits().getHits().length == 0) {
-        break;
-      }
-    }
-
-    es.destroyBulkProcessor();
-  }
-
-  private void normalizeOtherVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
-    String shortname = (String) metadata.get("Dataset-ShortName");
-    double versionNUm = getVersionNum(shortname);
-    updatedValues.put("Dataset-Derivative-VersionNum", versionNUm);
-
-  }
-
-  private Double getVersionNum(String version) {
-    if (version == null) {
-      return 0.0;
-    }
-    Double versionNum;
-    Pattern p = Pattern.compile(".*[a-zA-Z].*");
-    if ("Operational/Near-Real-Time".equals(version)) {
-      versionNum = 2.0;
-    } else if (version.matches("[0-9]{1}[a-zA-Z]{1}")) {
-      versionNum = Double.parseDouble(version.substring(0, 1));
-    } else if (p.matcher(version).find()) {
-      versionNum = 0.0;
-    } else {
-      versionNum = Double.parseDouble(version);
-      if (versionNum >= 5) {
-        versionNum = 20.0;
-      }
-    }
-    return versionNum;
-  }
-
-  private void normalizeSpatialVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
-
-    // get spatial resolution
-    Double spatialR;
-    if (metadata.get("Dataset-SatelliteSpatialResolution") != null) {
-      spatialR = (Double) metadata.get("Dataset-SatelliteSpatialResolution");
-    } else {
-      Double gridR = (Double) metadata.get("Dataset-GridSpatialResolution");
-      if (gridR != null) {
-        spatialR = 111 * gridR;
-      } else {
-        spatialR = 25.0;
-      }
-    }
-    updatedValues.put("Dataset-Derivative-SpatialResolution", spatialR);
-
-    // Transform Longitude and calculate coverage area
-    double top = parseDouble((String) metadata.get("DatasetCoverage-NorthLat"));
-    double bottom = parseDouble((String) metadata.get("DatasetCoverage-SouthLat"));
-    double left = parseDouble((String) metadata.get("DatasetCoverage-WestLon"));
-    double right = parseDouble((String) metadata.get("DatasetCoverage-EastLon"));
-
-    if (left > 180) {
-      left = left - 360;
-    }
-
-    if (right > 180) {
-      right = right - 360;
-    }
-
-    if (left == right) {
-      left = -180;
-      right = 180;
-    }
-
-    double area = (top - bottom) * (right - left);
-
-    updatedValues.put("DatasetCoverage-Derivative-EastLon", right);
-    updatedValues.put("DatasetCoverage-Derivative-WestLon", left);
-    updatedValues.put("DatasetCoverage-Derivative-NorthLat", top);
-    updatedValues.put("DatasetCoverage-Derivative-SouthLat", bottom);
-    updatedValues.put("DatasetCoverage-Derivative-Area", area);
-
-    // get processing level
-    String processingLevel = (String) metadata.get("Dataset-ProcessingLevel");
-    double dProLevel = this.getProLevelNum(processingLevel);
-    updatedValues.put("Dataset-Derivative-ProcessingLevel", dProLevel);
-  }
-
-  private void normalizeTemporalVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
-
-    String trStr = (String) metadata.get("Dataset-TemporalResolution");
-    if ("".equals(trStr)) {
-      trStr = (String) metadata.get("Dataset-TemporalRepeat");
-    }
-
-    updatedValues.put("Dataset-Derivative-TemporalResolution", covertTimeUnit(trStr));
-  }
-
-  private Double covertTimeUnit(String str) {
-    Double timeInHour;
-    if (str.contains("Hour")) {
-      timeInHour = Double.parseDouble(str.split(" ")[0]);
-    } else if (str.contains("Day")) {
-      timeInHour = Double.parseDouble(str.split(" ")[0]) * 24;
-    } else if (str.contains("Week")) {
-      timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7;
-    } else if (str.contains("Month")) {
-      timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7 * 30;
-    } else if (str.contains("Year")) {
-      timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7 * 30 * 365;
-    } else {
-      timeInHour = 0.0;
-    }
-
-    return timeInHour;
-  }
-
-  public Double getProLevelNum(String pro) {
-    if (pro == null) {
-      return 1.0;
-    }
-    Double proNum;
-    Pattern p = Pattern.compile(".*[a-zA-Z].*");
-    if (pro.matches("[0-9]{1}[a-zA-Z]{1}")) {
-      proNum = Double.parseDouble(pro.substring(0, 1));
-    } else if (p.matcher(pro).find()) {
-      proNum = 1.0;
-    } else {
-      proNum = Double.parseDouble(pro);
-    }
-
-    return proNum;
-  }
-
-  private double parseDouble(String strNumber) {
-    if (strNumber != null && strNumber.length() > 0) {
-      try {
-        return Double.parseDouble(strNumber);
-      } catch (Exception e) {
-        return -1;
-      }
-    } else
-      return 0;
-  }
-}
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/SessionCooccurence.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/SessionCooccurence.java
index e2b1f38..a4a79e0 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/SessionCooccurence.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/pre/SessionCooccurence.java
@@ -68,11 +68,10 @@ public Object execute() {
     JavaPairRDD<String, List<String>> sessionDatasetRDD = extractor.bulidSessionDatasetRDD(props, es, spark);
 
     // remove retired datasets
-    JavaPairRDD<String, List<String>> sessionFiltedDatasetsRDD = removeRetiredDataset(es, sessionDatasetRDD);
-    LabeledRowMatrix datasetSessionMatrix = MatrixUtil.createWordDocMatrix(sessionFiltedDatasetsRDD);
+    LabeledRowMatrix datasetSessionMatrix = MatrixUtil.createWordDocMatrix(sessionDatasetRDD);
 
     // export
-    MatrixUtil.exportToCSV(datasetSessionMatrix.rowMatrix, datasetSessionMatrix.rowkeys, datasetSessionMatrix.colkeys, props.getProperty("session_metadata_Matrix"));
+    MatrixUtil.exportToCSV(datasetSessionMatrix.rowMatrix, datasetSessionMatrix.rowkeys, datasetSessionMatrix.colkeys, props.getProperty(MudrodConstants.METADATA_SESSION_MATRIX_PATH));
 
     endTime = System.currentTimeMillis();
 
@@ -139,7 +138,7 @@ public Object execute(Object o) {
     while (true) {
       for (SearchHit hit : scrollResp.getHits().getHits()) {
         Map<String, Object> metadata = hit.getSource();
-        String shortName = (String) metadata.get("Dataset-ShortName");
+        String shortName = (String) metadata.get(props.getProperty(MudrodConstants.METADATA_ID));
         shortnameMap.put(shortName.toLowerCase(), shortName);
       }
 
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/AbstractBasedSimilarity.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/AbstractBasedSimilarity.java
index 7a288a4..311b21a 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/AbstractBasedSimilarity.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/AbstractBasedSimilarity.java
@@ -16,6 +16,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.apache.sdap.mudrod.semantics.SVDAnalyzer;
 import org.apache.sdap.mudrod.utils.LinkageTriple;
 import org.slf4j.Logger;
@@ -49,18 +50,11 @@ public Object execute() {
     startTime = System.currentTimeMillis();
 
     try {
-      /*String topicMatrixFile = props.getProperty("metadata_term_tfidf_matrix");
-      SemanticAnalyzer analyzer = new SemanticAnalyzer(props, es, spark);
-      List<LinkageTriple> triples = analyzer
-          .calTermSimfromMatrix(topicMatrixFile);
-      analyzer.saveToES(triples, props.getProperty("indexName"),
-          props.getProperty("metadataTermTFIDFSimType"), true, true);*/
-
       // for comparison
       SVDAnalyzer svd = new SVDAnalyzer(props, es, spark);
-      svd.getSVDMatrix(props.getProperty("metadata_word_tfidf_matrix"), 150, props.getProperty("metadata_word_tfidf_matrix"));
-      List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty("metadata_word_tfidf_matrix"));
-      svd.saveToES(tripleList, props.getProperty("indexName"), props.getProperty("metadataWordTFIDFSimType"), true, true);
+      svd.getSVDMatrix(props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH), 150, props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH));
+      List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty(MudrodConstants.METADATA_WORD_MATRIX_PATH));
+      svd.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.METADATA_WORD_SIM_TYPE, true, true);
 
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/FeatureBasedSimilarity.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/FeatureBasedSimilarity.java
new file mode 100644
index 0000000..4be7ff9
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/FeatureBasedSimilarity.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes the preprocessing, processing, and data structure used
+ * by recommendation module.
+ */
+package org.apache.sdap.mudrod.recommendation.process;
+
+import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.recommendation.structure.MetadataFeature;
+import org.apache.sdap.mudrod.recommendation.structure.PODAACMetadataFeature;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.Properties;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+public class FeatureBasedSimilarity extends DiscoveryStepAbstract implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(FeatureBasedSimilarity.class);
+
+  private DecimalFormat df = new DecimalFormat("#.000");
+  // a map from variable to its type
+  MetadataFeature metadata = null;
+  public Map<String, Integer> variableTypes;
+  public Map<String, Integer> variableWeights;
+
+
+  // index name
+  private String indexName;
+  // type name of metadata in ES
+  private String metadataType;
+  private String variableSimType;
+
+  /**
+   * Creates a new instance of OHEncoder.
+   *
+   * @param props the Mudrod configuration
+   * @param es    an instantiated {@link ESDriver}
+   * @param spark an instantiated {@link SparkDriver}
+   */
+  public FeatureBasedSimilarity(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+
+    indexName = props.getProperty(MudrodConstants.ES_INDEX_NAME);
+    metadataType = MudrodConstants.RECOM_METADATA_TYPE;
+    variableSimType = MudrodConstants.METADATA_FEATURE_SIM_TYPE;
+ 
+    // important, please change to other class when using other metadata
+    metadata = new PODAACMetadataFeature();
+    metadata.inital();
+    variableTypes = metadata.featureTypes;
+    variableWeights = metadata.featureWeights;
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("*****************calculating metadata feature based similarity starts******************");
+    startTime = System.currentTimeMillis();
+    es.deleteType(indexName, variableSimType);
+    addMapping(es, indexName, variableSimType);
+
+    featureSimilarity(es);
+    es.refreshIndex();
+    normalizeVariableWeight(es);
+    es.refreshIndex();
+    endTime = System.currentTimeMillis();
+    LOG.info("*****************calculating metadata feature based similarity ends******************Took {}s", (endTime - startTime) / 1000);
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  public void featureSimilarity(ESDriver es) {
+
+    es.createBulkProcessor();
+
+    List<Map<String, Object>> metadatas = new ArrayList<>();
+    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+        .actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> metadataA = hit.getSource();
+        metadatas.add(metadataA);
+      }
+
+      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    int size = metadatas.size();
+
+    for (int i = 0; i < size; i++) {
+      Map<String, Object> metadataA = metadatas.get(i);
+      String shortNameA = (String) metadataA.get(props.getProperty(MudrodConstants.METADATA_ID));
+      for (int j = 0; j < size; j++) {
+    	metadataA = metadatas.get(i);
+        Map<String, Object> metadataB = metadatas.get(j);
+        String shortNameB = (String) metadataB.get(props.getProperty(MudrodConstants.METADATA_ID));
+
+        try {
+          XContentBuilder contentBuilder = jsonBuilder().startObject();
+          contentBuilder.field("concept_A", shortNameA);
+          contentBuilder.field("concept_B", shortNameB);
+
+          // feature similarity
+          metadata.featureSimilarity(metadataA, metadataB, contentBuilder);
+
+          contentBuilder.endObject();
+
+          IndexRequest ir = new IndexRequest(indexName, variableSimType).source(contentBuilder);
+          es.getBulkProcessor().add(ir);
+
+        } catch (IOException e1) {
+          e1.printStackTrace();
+        }
+
+      }
+    }
+
+    es.destroyBulkProcessor();
+  }
+
+  public static void addMapping(ESDriver es, String index, String type) {
+    XContentBuilder Mapping;
+    try {
+      Mapping = jsonBuilder().startObject().startObject(type).startObject("properties").startObject("concept_A").field("type", "string").field("index", "not_analyzed").endObject()
+          .startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject()
+
+          .endObject().endObject().endObject();
+
+      es.getClient().admin().indices().preparePutMapping(index).setType(type).setSource(Mapping).execute().actionGet();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void normalizeVariableWeight(ESDriver es) {
+
+    es.createBulkProcessor();
+
+    double totalWeight = 0.0;
+    for (String variable : variableWeights.keySet()) {
+      totalWeight += variableWeights.get(variable);
+    }
+
+    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(variableSimType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+        .actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> similarities = hit.getSource();
+
+        double totalSim = 0.0;
+        for (String variable : variableWeights.keySet()) {
+          if (similarities.containsKey(variable + "_Sim")) {
+            double value = (double) similarities.get(variable + "_Sim");
+            double weight = variableWeights.get(variable);
+            totalSim += weight * value;
+          }
+        }
+
+        double weight = totalSim / totalWeight;
+        UpdateRequest ur = es.generateUpdateRequest(indexName, variableSimType, hit.getId(), "weight", weight);
+        es.getBulkProcessor().add(ur);
+      }
+
+      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    es.destroyBulkProcessor();
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/SessionBasedCF.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/SessionBasedCF.java
index 5ea461b..c59db6f 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/SessionBasedCF.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/SessionBasedCF.java
@@ -16,6 +16,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.apache.sdap.mudrod.semantics.SemanticAnalyzer;
 import org.apache.sdap.mudrod.utils.LinkageTriple;
 import org.apache.sdap.mudrod.utils.SimilarityUtil;
@@ -53,12 +54,12 @@ public Object execute() {
     startTime = System.currentTimeMillis();
 
     try {
-      String session_metadatFile = props.getProperty("session_metadata_Matrix");
+      String session_metadatFile = props.getProperty(MudrodConstants.METADATA_SESSION_MATRIX_PATH);
       File f = new File(session_metadatFile);
       if (f.exists()) {
         SemanticAnalyzer analyzer = new SemanticAnalyzer(props, es, spark);
         List<LinkageTriple> triples = analyzer.calTermSimfromMatrix(session_metadatFile, SimilarityUtil.SIM_PEARSON, 1);
-        analyzer.saveToES(triples, props.getProperty("indexName"), props.getProperty("metadataSessionBasedSimType"), true, false);
+        analyzer.saveToES(triples, props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.METADATA_SESSION_SIM_TYPE, true, false);
       }
 
     } catch (Exception e) {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/VariableBasedSimilarity.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/VariableBasedSimilarity.java
deleted file mode 100644
index 9bf0884..0000000
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/process/VariableBasedSimilarity.java
+++ /dev/null
@@ -1,375 +0,0 @@
-package org.apache.sdap.mudrod.recommendation.process;
-
-import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
-import org.apache.sdap.mudrod.driver.ESDriver;
-import org.apache.sdap.mudrod.driver.SparkDriver;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-public class VariableBasedSimilarity extends DiscoveryStepAbstract implements Serializable {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-
-  private static final Logger LOG = LoggerFactory.getLogger(VariableBasedSimilarity.class);
-
-  // a map from variable to its type
-  public Map<String, Integer> variableTypes;
-  public Map<String, Integer> variableWeights;
-
-  private static final Integer VAR_CATEGORICAL = 3;
-  private static final Integer VAR_ORDINAL = 4;
-
-  // index name
-  private String indexName;
-  // type name of metadata in ES
-  private String metadataType;
-  private String variableSimType;
-
-  /**
-   * Creates a new instance of OHEncoder.
-   *
-   * @param props the Mudrod configuration
-   * @param es    an instantiated {@link ESDriver}
-   * @param spark an instantiated {@link SparkDriver}
-   */
-  public VariableBasedSimilarity(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-
-    indexName = props.getProperty("indexName");
-    metadataType = props.getProperty("recom_metadataType");
-    variableSimType = props.getProperty("metadataCodeSimType");
-    this.inital();
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("*****************calculating metadata variables based similarity starts******************");
-    startTime = System.currentTimeMillis();
-    es.deleteType(indexName, variableSimType);
-    addMapping(es, indexName, variableSimType);
-
-    VariableBasedSimilarity(es);
-    es.refreshIndex();
-    normalizeVariableWeight(es);
-    es.refreshIndex();
-    endTime = System.currentTimeMillis();
-    LOG.info("*****************calculating metadata variables based similarity ends******************Took {}s", (endTime - startTime) / 1000);
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-  public void inital() {
-    this.initVariableType();
-    this.initVariableWeight();
-  }
-
-  private void initVariableType() {
-    variableTypes = new HashMap<>();
-
-    variableTypes.put("DatasetParameter-Variable", VAR_CATEGORICAL);
-    variableTypes.put("DatasetRegion-Region", VAR_CATEGORICAL);
-    variableTypes.put("Dataset-ProjectionType", VAR_CATEGORICAL);
-    variableTypes.put("Dataset-ProcessingLevel", VAR_CATEGORICAL);
-    variableTypes.put("DatasetParameter-Topic", VAR_CATEGORICAL);
-    variableTypes.put("DatasetParameter-Term", VAR_CATEGORICAL);
-    variableTypes.put("DatasetParameter-Category", VAR_CATEGORICAL);
-    variableTypes.put("DatasetPolicy-DataFormat", VAR_CATEGORICAL);
-    variableTypes.put("Collection-ShortName", VAR_CATEGORICAL);
-    variableTypes.put("DatasetSource-Source-Type", VAR_CATEGORICAL);
-    variableTypes.put("DatasetSource-Source-ShortName", VAR_CATEGORICAL);
-    variableTypes.put("DatasetSource-Sensor-ShortName", VAR_CATEGORICAL);
-    variableTypes.put("DatasetPolicy-Availability", VAR_CATEGORICAL);
-    variableTypes.put("Dataset-Provider-ShortName", VAR_CATEGORICAL);
-
-    variableTypes.put("Dataset-Derivative-ProcessingLevel", VAR_ORDINAL);
-    variableTypes.put("Dataset-Derivative-TemporalResolution", VAR_ORDINAL);
-    variableTypes.put("Dataset-Derivative-SpatialResolution", VAR_ORDINAL);
-  }
-
-  private void initVariableWeight() {
-    variableWeights = new HashMap<>();
-
-    variableWeights.put("Dataset-Derivative-ProcessingLevel", 5);
-    variableWeights.put("DatasetParameter-Category", 5);
-    variableWeights.put("DatasetParameter-Variable", 5);
-    variableWeights.put("DatasetSource-Sensor-ShortName", 5);
-
-    variableWeights.put("DatasetPolicy-Availability", 4);
-    variableWeights.put("DatasetRegion-Region", 4);
-    variableWeights.put("DatasetSource-Source-Type", 4);
-    variableWeights.put("DatasetSource-Source-ShortName", 4);
-    variableWeights.put("DatasetParameter-Term", 4);
-    variableWeights.put("DatasetPolicy-DataFormat", 4);
-    variableWeights.put("Dataset-Derivative-SpatialResolution", 4);
-    variableWeights.put("Temporal_Covergae", 4);
-
-    variableWeights.put("DatasetParameter-Topic", 3);
-    variableWeights.put("Collection-ShortName", 3);
-    variableWeights.put("Dataset-Derivative-TemporalResolution", 3);
-    variableWeights.put("Spatial_Covergae", 3);
-
-    variableWeights.put("Dataset-ProjectionType", 1);
-    variableWeights.put("Dataset-Provider-ShortName", 1);
-  }
-
-  public void VariableBasedSimilarity(ESDriver es) {
-
-    es.createBulkProcessor();
-
-    List<Map<String, Object>> metadatas = new ArrayList<>();
-    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
-        .actionGet();
-    while (true) {
-      for (SearchHit hit : scrollResp.getHits().getHits()) {
-        Map<String, Object> metadataA = hit.getSource();
-        metadatas.add(metadataA);
-      }
-
-      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
-      if (scrollResp.getHits().getHits().length == 0) {
-        break;
-      }
-    }
-
-    for (Map<String, Object> metadataA : metadatas) {
-      String shortNameA = (String) metadataA.get("Dataset-ShortName");
-
-      for (Map<String, Object> metadataB : metadatas) {
-        String shortNameB = (String) metadataB.get("Dataset-ShortName");
-
-        try {
-          XContentBuilder contentBuilder = jsonBuilder().startObject();
-          contentBuilder.field("concept_A", shortNameA);
-          contentBuilder.field("concept_B", shortNameB);
-
-          // spatial similarity
-          this.spatialSimilarity(metadataA, metadataB, contentBuilder);
-          // temporal similarity
-          this.temporalSimilarity(metadataA, metadataB, contentBuilder);
-          // categorical variables similarity
-          this.categoricalVariablesSimilarity(metadataA, metadataB, contentBuilder);
-          // ordinal variables similarity
-          this.ordinalVariablesSimilarity(metadataA, metadataB, contentBuilder);
-
-          contentBuilder.endObject();
-
-          IndexRequest ir = new IndexRequest(indexName, variableSimType).source(contentBuilder);
-          es.getBulkProcessor().add(ir);
-
-        } catch (IOException e1) {
-          e1.printStackTrace();
-        }
-
-      }
-    }
-
-    es.destroyBulkProcessor();
-  }
-
-  /*
-   * refer to P. Frontiera, R. Larson, and J. Radke (2008) A comparison of
-     geometric approaches to assessing spatial similarity for GIR.
-     International Journal of Geographical Information Science,
-     22(3)
-   */
-  public void spatialSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException {
-
-    double topA = (double) metadataA.get("DatasetCoverage-Derivative-NorthLat");
-    double bottomA = (double) metadataA.get("DatasetCoverage-Derivative-SouthLat");
-    double leftA = (double) metadataA.get("DatasetCoverage-Derivative-WestLon");
-    double rightA = (double) metadataA.get("DatasetCoverage-Derivative-EastLon");
-    double areaA = (double) metadataA.get("DatasetCoverage-Derivative-Area");
-
-    double topB = (double) metadataB.get("DatasetCoverage-Derivative-NorthLat");
-    double bottomB = (double) metadataB.get("DatasetCoverage-Derivative-SouthLat");
-    double leftB = (double) metadataB.get("DatasetCoverage-Derivative-WestLon");
-    double rightB = (double) metadataB.get("DatasetCoverage-Derivative-EastLon");
-    double areaB = (double) metadataB.get("DatasetCoverage-Derivative-Area");
-
-    // Intersect area
-    double xOverlap = Math.max(0, Math.min(rightA, rightB) - Math.max(leftA, leftB));
-    double yOverlap = Math.max(0, Math.min(topA, topB) - Math.max(bottomA, bottomB));
-    double overlapArea = xOverlap * yOverlap;
-
-    // Calculate coverage similarity
-    double similarity = 0.0;
-    if (areaA > 0 && areaB > 0) {
-      similarity = (overlapArea / areaA + overlapArea / areaB) * 0.5;
-    }
-
-    contentBuilder.field("Spatial_Covergae_Sim", similarity);
-  }
-
-  public void temporalSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException {
-
-    double similarity;
-    double startTimeA = Double.parseDouble((String) metadataA.get("Dataset-DatasetCoverage-StartTimeLong"));
-    String endTimeAStr = (String) metadataA.get("Dataset-DatasetCoverage-StopTimeLong");
-    double endTimeA;
-    if ("".equals(endTimeAStr)) {
-      endTimeA = System.currentTimeMillis();
-    } else {
-      endTimeA = Double.parseDouble(endTimeAStr);
-    }
-    double timespanA = endTimeA - startTimeA;
-
-    double startTimeB = Double.parseDouble((String) metadataB.get("Dataset-DatasetCoverage-StartTimeLong"));
-    String endTimeBStr = (String) metadataB.get("Dataset-DatasetCoverage-StopTimeLong");
-    double endTimeB;
-    if ("".equals(endTimeBStr)) {
-      endTimeB = System.currentTimeMillis();
-    } else {
-      endTimeB = Double.parseDouble(endTimeBStr);
-    }
-    double timespanB = endTimeB - startTimeB;
-
-    double intersect;
-    if (startTimeB >= endTimeA || endTimeB <= startTimeA) {
-      intersect = 0.0;
-    } else if (startTimeB >= startTimeA && endTimeB <= endTimeA) {
-      intersect = timespanB;
-    } else if (startTimeA >= startTimeB && endTimeA <= endTimeB) {
-      intersect = timespanA;
-    } else {
-      intersect = (startTimeA > startTimeB) ? (endTimeB - startTimeA) : (endTimeA - startTimeB);
-    }
-
-    similarity = intersect / (Math.sqrt(timespanA) * Math.sqrt(timespanB));
-    contentBuilder.field("Temporal_Covergae_Sim", similarity);
-  }
-
-  public void categoricalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException {
-
-    for (String variable : variableTypes.keySet()) {
-      Integer type = variableTypes.get(variable);
-      if (type != VAR_CATEGORICAL) {
-        continue;
-      }
-
-      double similarity = 0.0;
-      Object valueA = metadataA.get(variable);
-      Object valueB = metadataB.get(variable);
-      if (valueA instanceof ArrayList) {
-        ArrayList<String> aList = (ArrayList<String>) valueA;
-        ArrayList<String> bList = (ArrayList<String>) valueB;
-        if (aList != null && bList != null) {
-
-          int lengthA = aList.size();
-          List<String> newAList = new ArrayList<>(aList);
-          List<String> newBList = new ArrayList<>(bList);
-          newAList.retainAll(newBList);
-          similarity = newAList.size() / lengthA;
-        }
-
-      } else if (valueA instanceof String) {
-        if (valueA.equals(valueB)) {
-          similarity = 1.0;
-        }
-      }
-
-      contentBuilder.field(variable + "_Sim", similarity);
-    }
-  }
-
-  public void ordinalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException {
-    for (String variable : variableTypes.keySet()) {
-      Integer type = variableTypes.get(variable);
-      if (type != VAR_ORDINAL) {
-        continue;
-      }
-
-      double similarity = 0.0;
-      Object valueA = metadataA.get(variable);
-      Object valueB = metadataB.get(variable);
-      if (valueA != null && valueB != null) {
-
-        double a = (double) valueA;
-        double b = (double) valueB;
-        if (a != 0.0) {
-          similarity = 1 - Math.abs(b - a) / a;
-          if (similarity < 0) {
-            similarity = 0.0;
-          }
-        }
-      }
-
-      contentBuilder.field(variable + "_Sim", similarity);
-    }
-  }
-
-  public static void addMapping(ESDriver es, String index, String type) {
-    XContentBuilder Mapping;
-    try {
-      Mapping = jsonBuilder().startObject().startObject(type).startObject("properties").startObject("concept_A").field("type", "string").field("index", "not_analyzed").endObject()
-          .startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject()
-
-          .endObject().endObject().endObject();
-
-      es.getClient().admin().indices().preparePutMapping(index).setType(type).setSource(Mapping).execute().actionGet();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  public void normalizeVariableWeight(ESDriver es) {
-
-    es.createBulkProcessor();
-
-    double totalWeight = 0.0;
-    for (String variable : variableWeights.keySet()) {
-      totalWeight += variableWeights.get(variable);
-    }
-
-    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(variableSimType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
-        .actionGet();
-    while (true) {
-      for (SearchHit hit : scrollResp.getHits().getHits()) {
-        Map<String, Object> similarities = hit.getSource();
-
-        double totalSim = 0.0;
-        for (String variable : variableWeights.keySet()) {
-          if (similarities.containsKey(variable + "_Sim")) {
-            double value = (double) similarities.get(variable + "_Sim");
-            double weight = variableWeights.get(variable);
-            totalSim += weight * value;
-          }
-        }
-
-        double weight = totalSim / totalWeight;
-        UpdateRequest ur = es.generateUpdateRequest(indexName, variableSimType, hit.getId(), "weight", weight);
-        es.getBulkProcessor().add(ur);
-      }
-
-      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
-      if (scrollResp.getHits().getHits().length == 0) {
-        break;
-      }
-    }
-
-    es.destroyBulkProcessor();
-  }
-}
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java
index f38f8ed..2b29c03 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java
@@ -20,6 +20,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.apache.sdap.mudrod.main.MudrodEngine;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
@@ -45,7 +46,7 @@
   // format decimal
   DecimalFormat df = new DecimalFormat("#.00");
   // index name
-  protected static final String INDEX_NAME = "indexName";
+  protected static final String INDEX_NAME = MudrodConstants.ES_INDEX_NAME;
   private static final String WEIGHT = "weight";
 
   /**
@@ -87,13 +88,13 @@ public Object execute(Object o) {
   public JsonObject getRecomDataInJson(String input, int num) {
     JsonObject resultJson = new JsonObject();
 
-    String type = props.getProperty("metadataCodeSimType");
+    String type = MudrodConstants.METADATA_FEATURE_SIM_TYPE;
     Map<String, Double> sortedVariableSimMap = getRelatedData(type, input, num + 10);
 
-    type = props.getProperty("metadataWordTFIDFSimType");
+    type = MudrodConstants.METADATA_WORD_SIM_TYPE;
     Map<String, Double> sortedAbstractSimMap = getRelatedData(type, input, num + 10);
 
-    type = props.getProperty("metadataSessionBasedSimType");
+    type = MudrodConstants.METADATA_SESSION_SIM_TYPE;
     Map<String, Double> sortedSessionSimMap = getRelatedData(type, input, num + 10);
 
     JsonElement variableSimJson = mapToJson(sortedVariableSimMap, num);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataFeature.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataFeature.java
new file mode 100644
index 0000000..a36b9df
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataFeature.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes the preprocessing, processing, and data structure used
+ * by recommendation module.
+ */
+package org.apache.sdap.mudrod.recommendation.structure;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class MetadataFeature implements Serializable {
+	
+	protected static final Integer VAR_SPATIAL = 1;
+	protected static final Integer VAR_TEMPORAL = 2;
+	protected static final Integer VAR_CATEGORICAL = 3;
+	protected static final Integer VAR_ORDINAL = 4;
+	
+	public Map<String, Integer> featureTypes = new HashMap<>();
+	public Map<String, Integer> featureWeights = new HashMap<>();
+
+	public void normalizeMetadataVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
+
+		this.normalizeSpatialVariables(metadata, updatedValues);
+		this.normalizeTemporalVariables(metadata, updatedValues);
+		this.normalizeOtherVariables(metadata, updatedValues);
+	}
+	
+	public void inital() {
+	    this.initFeatureType();
+	    this.initFeatureWeight();
+	}
+	
+	public void featureSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) {
+		 this.spatialSimilarity(metadataA, metadataB, contentBuilder);
+		 this.temporalSimilarity(metadataA, metadataB, contentBuilder);
+		 this.categoricalVariablesSimilarity(metadataA, metadataB, contentBuilder);
+		 this.ordinalVariablesSimilarity(metadataA, metadataB, contentBuilder);
+	}
+
+	/* for normalization */
+	public abstract void normalizeSpatialVariables(Map<String, Object> metadata, Map<String, Object> updatedValues);
+
+	public abstract void normalizeTemporalVariables(Map<String, Object> metadata, Map<String, Object> updatedValues);
+
+	public abstract void normalizeOtherVariables(Map<String, Object> metadata, Map<String, Object> updatedValues);
+
+	/* for similarity */
+	public abstract void initFeatureType();
+
+	public abstract void initFeatureWeight();
+	
+	public abstract void spatialSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder);
+  
+	public abstract void temporalSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder);
+
+	public abstract void categoricalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder);
+  
+	public abstract void ordinalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder);
+}
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataOpt.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataTokenizer.java
similarity index 73%
rename from core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataOpt.java
rename to core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataTokenizer.java
index cda8d6f..5b93180 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataOpt.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/MetadataTokenizer.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes the preprocessing, processing, and data structure used
+ * by recommendation module.
+ */
 package org.apache.sdap.mudrod.recommendation.structure;
 
 import org.apache.sdap.mudrod.driver.ESDriver;
@@ -16,13 +33,14 @@
 import scala.Tuple2;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.Properties;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 
-public class MetadataOpt implements Serializable {
 
-  /**
-   * 
-   */
+public class MetadataTokenizer implements Serializable {
   private static final long serialVersionUID = 1L;
   private String indexName;
   private String metadataType;
@@ -31,24 +49,21 @@
   public static final String SPLIT_BLANK = " ";
   public static final String SPLIT_COMMA = ",";
 
-  public MetadataOpt(Properties props) {
+  public MetadataTokenizer(Properties props) {
     indexName = props.getProperty(MudrodConstants.ES_INDEX_NAME);
-    metadataType = props.getProperty("recom_metadataType");
-
-    variables = new ArrayList<>();
-    variables.add("DatasetParameter-Term");
-    variables.add("DatasetParameter-Variable");
-    variables.add("Dataset-Description");
-    variables.add("Dataset-LongName");
+    metadataType = MudrodConstants.RECOM_METADATA_TYPE;
+    
+    String source = props.getProperty(MudrodConstants.SEMANTIC_FIELDS);
+    variables = new ArrayList<String>(Arrays.asList(source.split(",")));
   }
 
-  public JavaPairRDD<String, String> loadAll(ESDriver es, SparkDriver spark) throws Exception {
-    List<Tuple2<String, String>> datasetsTokens = this.loadMetadataFromES(es, variables);
+  public JavaPairRDD<String, String> loadAll(ESDriver es, SparkDriver spark, String metadataName) throws Exception {
+    List<Tuple2<String, String>> datasetsTokens = this.loadMetadataFromES(es, variables, metadataName);
     return this.parallizeData(spark, datasetsTokens);
   }
 
-  public JavaPairRDD<String, String> loadAll(ESDriver es, SparkDriver spark, List<String> variables) throws Exception {
-    List<Tuple2<String, String>> datasetsTokens = this.loadMetadataFromES(es, variables);
+  public JavaPairRDD<String, String> loadAll(ESDriver es, SparkDriver spark, List<String> variables, String metadataName) throws Exception {
+    List<Tuple2<String, String>> datasetsTokens = this.loadMetadataFromES(es, variables, metadataName);
     return this.parallizeData(spark, datasetsTokens);
   }
 
@@ -99,7 +114,7 @@ public MetadataOpt(Properties props) {
     return java.util.Arrays.asList(tokens);
   }
 
-  public List<Tuple2<String, String>> loadMetadataFromES(ESDriver es, List<String> variables) throws Exception {
+  public List<Tuple2<String, String>> loadMetadataFromES(ESDriver es, List<String> variables, String metadataName) throws Exception {
 
     SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setQuery(QueryBuilders.matchAllQuery()).setScroll(new TimeValue(60000)).setSize(100).execute()
         .actionGet();
@@ -109,10 +124,12 @@ public MetadataOpt(Properties props) {
 
       for (SearchHit hit : scrollResp.getHits().getHits()) {
         Map<String, Object> result = hit.getSource();
-        String shortName = (String) result.get("Dataset-ShortName");
+        String shortName = (String) result.get(metadataName);
 
         String filedStr = "";
-        for (String filed : variables) {
+        int size = variables.size();
+        for (int i = 0; i < size; i++) {
+          String filed = variables.get(i);
           Object filedValue = result.get(filed);
 
           if (filedValue != null) {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/PODAACMetadataFeature.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/PODAACMetadataFeature.java
new file mode 100644
index 0000000..f988678
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/PODAACMetadataFeature.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes the preprocessing, processing, and data structure used
+ * by recommendation module.
+ */
+package org.apache.sdap.mudrod.recommendation.structure;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class PODAACMetadataFeature extends MetadataFeature {
+
+	public void normalizeSpatialVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
+
+		// get spatial resolution
+		Double spatialR;
+		if (metadata.get("Dataset-SatelliteSpatialResolution") != null) {
+			spatialR = (Double) metadata.get("Dataset-SatelliteSpatialResolution");
+		} else {
+			Double gridR = (Double) metadata.get("Dataset-GridSpatialResolution");
+			if (gridR != null) {
+				spatialR = 111 * gridR;
+			} else {
+				spatialR = 25.0;
+			}
+		}
+		updatedValues.put("Dataset-Derivative-SpatialResolution", spatialR);
+
+		// Transform Longitude and calculate coverage area
+		double top = parseDouble((String) metadata.get("DatasetCoverage-NorthLat"));
+		double bottom = parseDouble((String) metadata.get("DatasetCoverage-SouthLat"));
+		double left = parseDouble((String) metadata.get("DatasetCoverage-WestLon"));
+		double right = parseDouble((String) metadata.get("DatasetCoverage-EastLon"));
+
+		if (left > 180) {
+			left = left - 360;
+		}
+
+		if (right > 180) {
+			right = right - 360;
+		}
+
+		if (left == right) {
+			left = -180;
+			right = 180;
+		}
+
+		double area = (top - bottom) * (right - left);
+
+		updatedValues.put("DatasetCoverage-Derivative-EastLon", right);
+		updatedValues.put("DatasetCoverage-Derivative-WestLon", left);
+		updatedValues.put("DatasetCoverage-Derivative-NorthLat", top);
+		updatedValues.put("DatasetCoverage-Derivative-SouthLat", bottom);
+		updatedValues.put("DatasetCoverage-Derivative-Area", area);
+
+		// get processing level
+		String processingLevel = (String) metadata.get("Dataset-ProcessingLevel");
+		double dProLevel = this.getProLevelNum(processingLevel);
+		updatedValues.put("Dataset-Derivative-ProcessingLevel", dProLevel);
+	}
+
+	public void normalizeTemporalVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
+		String trStr = (String) metadata.get("Dataset-TemporalResolution");
+		if ("".equals(trStr)) {
+			trStr = (String) metadata.get("Dataset-TemporalRepeat");
+		}
+
+		updatedValues.put("Dataset-Derivative-TemporalResolution", covertTimeUnit(trStr));
+	}
+
+	public void normalizeOtherVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
+		String shortname = (String) metadata.get("Dataset-ShortName");
+		double versionNUm = getVersionNum(shortname);
+		updatedValues.put("Dataset-Derivative-VersionNum", versionNUm);
+	}
+
+	private Double covertTimeUnit(String str) {
+		Double timeInHour;
+		if (str.contains("Hour")) {
+			timeInHour = Double.parseDouble(str.split(" ")[0]);
+		} else if (str.contains("Day")) {
+			timeInHour = Double.parseDouble(str.split(" ")[0]) * 24;
+		} else if (str.contains("Week")) {
+			timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7;
+		} else if (str.contains("Month")) {
+			timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7 * 30;
+		} else if (str.contains("Year")) {
+			timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7 * 30 * 365;
+		} else {
+			timeInHour = 0.0;
+		}
+
+		return timeInHour;
+	}
+
+	private double parseDouble(String strNumber) {
+		if (strNumber != null && strNumber.length() > 0) {
+			try {
+				return Double.parseDouble(strNumber);
+			} catch (Exception e) {
+				return -1;
+			}
+		} else
+			return 0;
+	}
+
+	public Double getProLevelNum(String pro) {
+		if (pro == null) {
+			return 1.0;
+		}
+		Double proNum = 0.0;
+		Pattern p = Pattern.compile(".*[a-zA-Z].*");
+		if (pro.matches("[0-9]{1}[a-zA-Z]{1}")) {
+			proNum = Double.parseDouble(pro.substring(0, 1));
+		} else if (p.matcher(pro).find()) {
+			proNum = 1.0;
+		} else {
+			proNum = Double.parseDouble(pro);
+		}
+
+		return proNum;
+	}
+
+	private Double getVersionNum(String version) {
+		if (version == null) {
+			return 0.0;
+		}
+		Double versionNum = 0.0;
+		Pattern p = Pattern.compile(".*[a-zA-Z].*");
+		if ("Operational/Near-Real-Time".equals(version)) {
+			versionNum = 2.0;
+		} else if (version.matches("[0-9]{1}[a-zA-Z]{1}")) {
+			versionNum = Double.parseDouble(version.substring(0, 1));
+		} else if (p.matcher(version).find()) {
+			versionNum = 0.0;
+		} else {
+			versionNum = Double.parseDouble(version);
+			if (versionNum >= 5) {
+				versionNum = 20.0;
+			}
+		}
+		return versionNum;
+	}
+
+	@Override
+	public void initFeatureType() {
+		// TODO Auto-generated method stub
+		//Map<String, Integer> featureTypes = new HashMap<>();
+		featureTypes.put("DatasetParameter-Variable", VAR_CATEGORICAL);
+		featureTypes.put("DatasetRegion-Region", VAR_CATEGORICAL);
+		featureTypes.put("Dataset-ProjectionType", VAR_CATEGORICAL);
+		featureTypes.put("Dataset-ProcessingLevel", VAR_CATEGORICAL);
+		featureTypes.put("DatasetParameter-Topic", VAR_CATEGORICAL);
+		featureTypes.put("DatasetParameter-Term", VAR_CATEGORICAL);
+		featureTypes.put("DatasetParameter-Category", VAR_CATEGORICAL);
+		featureTypes.put("DatasetPolicy-DataFormat", VAR_CATEGORICAL);
+		featureTypes.put("Collection-ShortName", VAR_CATEGORICAL);
+		featureTypes.put("DatasetSource-Source-Type", VAR_CATEGORICAL);
+		featureTypes.put("DatasetSource-Source-ShortName", VAR_CATEGORICAL);
+		featureTypes.put("DatasetSource-Sensor-ShortName", VAR_CATEGORICAL);
+		featureTypes.put("DatasetPolicy-Availability", VAR_CATEGORICAL);
+		featureTypes.put("Dataset-Provider-ShortName", VAR_CATEGORICAL);
+		featureTypes.put("Dataset-Derivative-ProcessingLevel", VAR_ORDINAL);
+		featureTypes.put("Dataset-Derivative-TemporalResolution", VAR_ORDINAL);
+		featureTypes.put("Dataset-Derivative-SpatialResolution", VAR_ORDINAL);
+	}
+
+	@Override
+	public void initFeatureWeight() {
+		// TODO Auto-generated method stub
+		//Map<String, Integer> featureWeights = new HashMap<>();
+		featureWeights.put("Dataset-Derivative-ProcessingLevel", 5);
+		featureWeights.put("DatasetParameter-Category", 5);
+		featureWeights.put("DatasetParameter-Variable", 5);
+		featureWeights.put("DatasetSource-Sensor-ShortName", 5);
+		featureWeights.put("DatasetPolicy-Availability", 4);
+		featureWeights.put("DatasetRegion-Region", 4);
+		featureWeights.put("DatasetSource-Source-Type", 4);
+		featureWeights.put("DatasetSource-Source-ShortName", 4);
+		featureWeights.put("DatasetParameter-Term", 4);
+		featureWeights.put("DatasetPolicy-DataFormat", 4);
+		featureWeights.put("Dataset-Derivative-SpatialResolution", 4);
+		featureWeights.put("Temporal_Covergae", 4);
+		featureWeights.put("DatasetParameter-Topic", 3);
+		featureWeights.put("Collection-ShortName", 3);
+		featureWeights.put("Dataset-Derivative-TemporalResolution", 3);
+		featureWeights.put("Spatial_Covergae", 3);
+		featureWeights.put("Dataset-ProjectionType", 1);
+		featureWeights.put("Dataset-Provider-ShortName", 1);
+	}
+
+	@Override
+	public void spatialSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB,
+			XContentBuilder contentBuilder) {
+		// TODO Auto-generated method stub
+		double topA = (double) metadataA.get("DatasetCoverage-Derivative-NorthLat");
+		double bottomA = (double) metadataA.get("DatasetCoverage-Derivative-SouthLat");
+		double leftA = (double) metadataA.get("DatasetCoverage-Derivative-WestLon");
+		double rightA = (double) metadataA.get("DatasetCoverage-Derivative-EastLon");
+		double areaA = (double) metadataA.get("DatasetCoverage-Derivative-Area");
+
+		double topB = (double) metadataB.get("DatasetCoverage-Derivative-NorthLat");
+		double bottomB = (double) metadataB.get("DatasetCoverage-Derivative-SouthLat");
+		double leftB = (double) metadataB.get("DatasetCoverage-Derivative-WestLon");
+		double rightB = (double) metadataB.get("DatasetCoverage-Derivative-EastLon");
+		double areaB = (double) metadataB.get("DatasetCoverage-Derivative-Area");
+
+		// Intersect area
+		double xOverlap = Math.max(0, Math.min(rightA, rightB) - Math.max(leftA, leftB));
+		double yOverlap = Math.max(0, Math.min(topA, topB) - Math.max(bottomA, bottomB));
+		double overlapArea = xOverlap * yOverlap;
+
+		// Calculate coverage similarity
+		double similarity = 0.0;
+		if (areaA > 0 && areaB > 0) {
+			similarity = (overlapArea / areaA + overlapArea / areaB) * 0.5;
+		}
+
+		try {
+			contentBuilder.field("Spatial_Covergae_Sim", similarity);
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	@Override
+	public void temporalSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB,
+			XContentBuilder contentBuilder) {
+		// TODO Auto-generated method stub
+		double similarity = 0.0;
+		double startTimeA = Double.parseDouble((String) metadataA.get("Dataset-DatasetCoverage-StartTimeLong"));
+		String endTimeAStr = (String) metadataA.get("Dataset-DatasetCoverage-StopTimeLong");
+		double endTimeA = 0.0;
+		if ("".equals(endTimeAStr)) {
+			endTimeA = System.currentTimeMillis();
+		} else {
+			endTimeA = Double.parseDouble(endTimeAStr);
+		}
+		double timespanA = endTimeA - startTimeA;
+
+		double startTimeB = Double.parseDouble((String) metadataB.get("Dataset-DatasetCoverage-StartTimeLong"));
+		String endTimeBStr = (String) metadataB.get("Dataset-DatasetCoverage-StopTimeLong");
+		double endTimeB = 0.0;
+		if ("".equals(endTimeBStr)) {
+			endTimeB = System.currentTimeMillis();
+		} else {
+			endTimeB = Double.parseDouble(endTimeBStr);
+		}
+		double timespanB = endTimeB - startTimeB;
+
+		double intersect = 0.0;
+		if (startTimeB >= endTimeA || endTimeB <= startTimeA) {
+			intersect = 0.0;
+		} else if (startTimeB >= startTimeA && endTimeB <= endTimeA) {
+			intersect = timespanB;
+		} else if (startTimeA >= startTimeB && endTimeA <= endTimeB) {
+			intersect = timespanA;
+		} else {
+			intersect = (startTimeA > startTimeB) ? (endTimeB - startTimeA) : (endTimeA - startTimeB);
+		}
+
+		similarity = intersect / (Math.sqrt(timespanA) * Math.sqrt(timespanB));
+		try {
+			contentBuilder.field("Temporal_Covergae_Sim", similarity);
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	@Override
+	public void categoricalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB,
+			XContentBuilder contentBuilder) {
+		// TODO Auto-generated method stub
+		for (String variable : featureTypes.keySet()) {
+			Integer type = featureTypes.get(variable);
+			if (type != VAR_CATEGORICAL) {
+				continue;
+			}
+
+			double similarity = 0.0;
+			Object valueA = metadataA.get(variable);
+			Object valueB = metadataB.get(variable);
+			if (valueA instanceof ArrayList) {
+				ArrayList<String> aList = (ArrayList<String>) valueA;
+				ArrayList<String> bList = (ArrayList<String>) valueB;
+				if (aList != null && bList != null) {
+
+					int lengthA = aList.size();
+					int lengthB = bList.size();
+					List<String> newAList = new ArrayList<>(aList);
+					List<String> newBList = new ArrayList<>(bList);
+					newAList.retainAll(newBList);
+					similarity = newAList.size() / lengthA;
+				}
+
+			} else if (valueA instanceof String) {
+				if (valueA.equals(valueB)) {
+					similarity = 1.0;
+				}
+			}
+
+			try {
+				contentBuilder.field(variable + "_Sim", similarity);
+			} catch (IOException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+	}
+
+	@Override
+	public void ordinalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB,
+			XContentBuilder contentBuilder) {
+		// TODO Auto-generated method stub
+		for (String variable : featureTypes.keySet()) {
+			Integer type = featureTypes.get(variable);
+			if (type != VAR_ORDINAL) {
+				continue;
+			}
+
+			double similarity = 0.0;
+			Object valueA = metadataA.get(variable);
+			Object valueB = metadataB.get(variable);
+			if (valueA != null && valueB != null) {
+
+				double a = (double) valueA;
+				double b = (double) valueB;
+				if (a != 0.0) {
+					similarity = 1 - Math.abs(b - a) / a;
+					if (similarity < 0) {
+						similarity = 0.0;
+					}
+				}
+			}
+
+			try {
+				contentBuilder.field(variable + "_Sim", similarity);
+			} catch (IOException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+	}
+}
diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/RecomData.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/RecomData.java
deleted file mode 100644
index 7bdbd0d..0000000
--- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/RecomData.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sdap.mudrod.recommendation.structure;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
-import org.apache.sdap.mudrod.driver.ESDriver;
-import org.apache.sdap.mudrod.driver.SparkDriver;
-import org.apache.sdap.mudrod.main.MudrodEngine;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.sort.SortOrder;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.*;
-
-/**
- * This class is used to test recommendation result similarity and session-level
- * similarity
- */
-public class RecomData extends DiscoveryStepAbstract {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  protected transient List<LinkedTerm> termList = new ArrayList<>();
-  DecimalFormat df = new DecimalFormat("#.00");
-  protected static final String INDEX_NAME = "indexName";
-  private static final String WEIGHT = "weight";
-
-  class LinkedTerm {
-    public String term = null;
-    public double weight = 0;
-    public String model = null;
-
-    public LinkedTerm(String str, double w, String m) {
-      term = str;
-      weight = w;
-      model = m;
-    }
-  }
-
-  public RecomData(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute() {
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-  public JsonObject getRecomDataInJson(String input, int num) {
-    String type = props.getProperty("metadataTermTFIDFSimType");
-    Map<String, Double> sortedOBSimMap = getRelatedData(type, input, num + 5);
-    JsonElement linkedJson = mapToJson(sortedOBSimMap, num);
-
-    // type = props.getProperty("metadataTermTFIDFSimType");
-    type = props.getProperty("metadataCodeSimType");
-
-    Map<String, Double> sortedMBSimMap = getRelatedData(type, input, num + 5);
-    JsonElement relatedJson = mapToJson(sortedMBSimMap, num);
-
-    JsonObject json = new JsonObject();
-
-    json.add("TFIDFSim", linkedJson);
-    json.add("TopicSim", relatedJson);
-
-    return json;
-  }
-
-  protected JsonElement mapToJson(Map<String, Double> wordweights, int num) {
-    Gson gson = new Gson();
-
-    List<JsonObject> nodes = new ArrayList<>();
-    Set<String> words = wordweights.keySet();
-    int i = 0;
-    for (String wordB : words) {
-      JsonObject node = new JsonObject();
-      node.addProperty("name", wordB);
-      node.addProperty("weight", wordweights.get(wordB));
-      nodes.add(node);
-
-      i += 1;
-      if (i >= num) {
-        break;
-      }
-    }
-
-    String nodesJson = gson.toJson(nodes);
-    JsonElement nodesElement = gson.fromJson(nodesJson, JsonElement.class);
-
-    return nodesElement;
-  }
-
-  public Map<String, Double> getRelatedData(String type, String input, int num) {
-    termList = new ArrayList<>();
-    Map<String, Double> termsMap = new HashMap<>();
-    Map<String, Double> sortedMap = new HashMap<>();
-    try {
-      List<LinkedTerm> links = getRelatedDataFromES(type, input, num);
-      for (LinkedTerm link : links) {
-        termsMap.put(link.term, link.weight);
-      }
-
-      sortedMap = sortMapByValue(termsMap); // terms_map will be empty
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-
-    return sortedMap;
-  }
-
-  public List<LinkedTerm> getRelatedDataFromES(String type, String input, int num) {
-    SearchRequestBuilder builder = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(type).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC)
-            .setSize(num);
-
-    SearchResponse usrhis = builder.execute().actionGet();
-
-    for (SearchHit hit : usrhis.getHits().getHits()) {
-      Map<String, Object> result = hit.getSource();
-      String conceptB = (String) result.get("concept_B");
-
-      if (!conceptB.equals(input)) {
-        LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), type);
-        termList.add(lTerm);
-      }
-    }
-
-    return termList;
-  }
-
-  public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) {
-    List<String> mapKeys = new ArrayList<>(passedMap.keySet());
-    List<Double> mapValues = new ArrayList<>(passedMap.values());
-    Collections.sort(mapValues, Collections.reverseOrder());
-    Collections.sort(mapKeys, Collections.reverseOrder());
-
-    LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>();
-
-    Iterator<Double> valueIt = mapValues.iterator();
-    while (valueIt.hasNext()) {
-      Object val = valueIt.next();
-      Iterator<String> keyIt = mapKeys.iterator();
-
-      while (keyIt.hasNext()) {
-        Object key = keyIt.next();
-        String comp1 = passedMap.get(key).toString();
-        String comp2 = val.toString();
-
-        if (comp1.equals(comp2)) {
-          passedMap.remove(key);
-          mapKeys.remove(key);
-          sortedMap.put((String) key, (Double) val);
-          break;
-        }
-      }
-    }
-    return sortedMap;
-  }
-
-  public static void main(String[] args) throws IOException {
-
-    MudrodEngine me = new MudrodEngine();
-    Properties props = me.loadConfig();
-    ESDriver es = new ESDriver(me.getConfig());
-    RecomData test = new RecomData(props, es, null);
-
-    String input = "AQUARIUS_L3_SSS_SMIA_MONTHLY-CLIMATOLOGY_V4";
-    JsonObject json = test.getRecomDataInJson(input, 10);
-
-    System.out.println(json.toString());
-  }
-}
diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java
index 779105f..9121719 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickstreamImporter.java
@@ -78,7 +78,7 @@ public void importfromCSVtoES() {
     String cvsSplitBy = ",";
 
     try {
-      br = new BufferedReader(new FileReader(props.getProperty("clickstreamMatrix")));
+      br = new BufferedReader(new FileReader(props.getProperty(MudrodConstants.CLICKSTREAM_PATH)));
       String line = br.readLine();
       // first item needs to be skipped
       String[] dataList = line.split(cvsSplitBy);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/Ranker.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/Ranker.java
index 98522b4..21aa646 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/ssearch/Ranker.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/Ranker.java
@@ -30,19 +30,14 @@
  * Supports the ability to calculating ranking score
  */
 public class Ranker extends MudrodAbstract implements Serializable {
-  /**
-   *
-   */
   private static final long serialVersionUID = 1L;
   transient List<SResult> resultList = new ArrayList<>();
-
-  String learnerType = null;
   Learner le = null;
 
-  public Ranker(Properties props, ESDriver es, SparkDriver spark, String learnerType) {
+  public Ranker(Properties props, ESDriver es, SparkDriver spark) {
     super(props, es, spark);
-    this.learnerType = learnerType;
-    le = new Learner(learnerType, spark, props.getProperty(MudrodConstants.SVM_SGD_MODEL));
+    if("1".equals(props.getProperty(MudrodConstants.RANKING_ML)))
+      le = new Learner(spark, props.getProperty(MudrodConstants.RANKING_MODEL));
   }
 
   /**
@@ -138,6 +133,8 @@ private double getNDForm(double d) {
    * @return ranked result list
    */
   public List<SResult> rank(List<SResult> resultList) {
+    if(le==null) return resultList;
+    
     for (int i = 0; i < resultList.size(); i++) {
       for (int m = 0; m < SResult.rlist.length; m++) {
         String att = SResult.rlist[m].split("_")[0];
diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Learner.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Learner.java
index cb4da7e..8a752a3 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Learner.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/Learner.java
@@ -28,7 +28,6 @@
    *
    */
   private static final long serialVersionUID = 1L;
-  private static final String SPARKSVM = "SparkSVM";
   SVMModel model = null;
   transient SparkContext sc = null;
 
@@ -39,12 +38,10 @@
    * @param skd            an instance of spark driver
    * @param svmSgdModel    path to a trained model
    */
-  public Learner(String classifierName, SparkDriver skd, String svmSgdModel) {
-    if (classifierName.equals(SPARKSVM)) {
+  public Learner(SparkDriver skd, String svmSgdModel) {
       sc = skd.sc.sc();
       sc.addFile(svmSgdModel, true);
       model = SVMModel.load(sc, svmSgdModel);
-    }
   }
 
   /**
diff --git a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/TrainingImporter.java b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/TrainingImporter.java
index 2f27aa0..ff55c85 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/TrainingImporter.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/ssearch/ranking/TrainingImporter.java
@@ -54,7 +54,7 @@ public void addMapping() {
           .startObject("dataID").field("type", "string").field("index", "not_analyzed").endObject().startObject("label").field("type", "string").field("index", "not_analyzed").endObject().endObject()
           .endObject().endObject();
 
-      es.getClient().admin().indices().preparePutMapping(props.getProperty("indexName")).setType("trainingranking").setSource(Mapping).execute().actionGet();
+      es.getClient().admin().indices().preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType("trainingranking").setSource(Mapping).execute().actionGet();
     } catch (IOException e) {
       e.printStackTrace();
     }
@@ -78,7 +78,7 @@ public void importTrainingSet(String dataFolder) throws IOException {
         String[] list = line.split(",");
         String query = file.getName().replace(".csv", "");
         if (list.length > 0) {
-          IndexRequest ir = new IndexRequest(props.getProperty("indexName"), "trainingranking")
+          IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), "trainingranking")
               .source(jsonBuilder().startObject().field("query", query).field("dataID", list[0]).field("label", list[list.length - 1]).endObject());
           es.getBulkProcessor().add(ir);
         }
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java
index e678854..886cd4a 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java
@@ -16,6 +16,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.apache.sdap.mudrod.utils.LabeledRowMatrix;
 import org.apache.sdap.mudrod.utils.MatrixUtil;
 import org.apache.sdap.mudrod.weblog.structure.ClickStream;
@@ -48,11 +49,11 @@ public Object execute() {
     LOG.info("Starting ClickStreamGenerator...");
     startTime = System.currentTimeMillis();
 
-    String clickstremMatrixFile = props.getProperty("clickstreamMatrix");
+    String clickstremMatrixFile = props.getProperty(MudrodConstants.CLICKSTREAM_PATH);
     try {
       SessionExtractor extractor = new SessionExtractor();
       JavaRDD<ClickStream> clickstreamRDD = extractor.extractClickStreamFromES(this.props, this.es, this.spark);
-      int weight = Integer.parseInt(props.getProperty("downloadWeight"));
+      int weight = Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_WEIGHT));
       JavaPairRDD<String, List<String>> metaddataQueryRDD = extractor.bulidDataQueryRDD(clickstreamRDD, weight);
       LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metaddataQueryRDD);
 
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
index 79a014e..3e782a7 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
@@ -56,20 +56,6 @@
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(CrawlerDetection.class);
 
-  public static final String CRAWLER = "crawler";
-  public static final String GOOGLE_BOT = "googlebot";
-  public static final String BING_BOT = "bingbot";
-  public static final String YAHOO_BOT = "slurp";
-  public static final String YACY_BOT = "yacybot";
-  public static final String ROGER_BOT = "rogerbot";
-  public static final String YANDEX_BOT = "yandexbot";
-
-  public static final String NO_AGENT_BOT = "-";
-  public static final String PERL_BOT = "libwww-perl/";
-  public static final String APACHE_HHTP = "apache-httpclient/";
-  public static final String JAVA_CLIENT = "java/";
-  public static final String CURL = "curl/";
-
   /**
    * Paramterized constructor to instantiate a configured instance of
    * {@link CrawlerDetection}
@@ -84,8 +70,8 @@ public CrawlerDetection(Properties props, ESDriver es, SparkDriver spark) {
     super(props, es, spark);
   }
 
-  public CrawlerDetection() {
-    super(null, null, null);
+  public CrawlerDetection(Properties props) {
+    super(props, null, null);
   }
 
   @Override
@@ -110,13 +96,11 @@ public Object execute() {
    * @return 1 if the log is initiated by crawler, 0 otherwise
    */
   public boolean checkKnownCrawler(String agent) {
-    agent = agent.toLowerCase();
-    if (agent.contains(CRAWLER) || agent.contains(GOOGLE_BOT) || agent.contains(BING_BOT) || agent.contains(APACHE_HHTP) || agent.contains(PERL_BOT) || agent.contains(YAHOO_BOT) || agent
-        .contains(YANDEX_BOT) || agent.contains(NO_AGENT_BOT) || agent.contains(PERL_BOT) || agent.contains(APACHE_HHTP) || agent.contains(JAVA_CLIENT) || agent.contains(CURL)) {
-      return true;
-    } else {
-      return false;
-    }
+    String[] crawlers = props.getProperty(MudrodConstants.BLACK_LIST_AGENT).split(",");
+    for (int i = 0; i < crawlers.length; i++) {
+      if (agent.toLowerCase().contains(crawlers[i].trim())) return true;
+    }  
+    return false;
   }
 
   public void checkByRate() throws InterruptedException, IOException {
@@ -138,7 +122,7 @@ public void checkByRate() throws InterruptedException, IOException {
   public void checkByRateInSequential() throws InterruptedException, IOException {
     es.createBulkProcessor();
 
-    int rate = Integer.parseInt(props.getProperty("sendingrate"));
+    int rate = Integer.parseInt(props.getProperty(MudrodConstants.REQUEST_RATE));
 
     Terms users = this.getUserTerms(this.httpType);
     LOG.info("Original User count: {}", Integer.toString(users.getBuckets().size()));
@@ -178,7 +162,7 @@ void checkByRateInParallel() throws InterruptedException, IOException {
 
   private int checkByRate(ESDriver es, String user) {
 
-    int rate = Integer.parseInt(props.getProperty("sendingrate"));
+    int rate = Integer.parseInt(props.getProperty(MudrodConstants.REQUEST_RATE));
     Pattern pattern = Pattern.compile("get (.*?) http/*");
     Matcher matcher;
 
@@ -202,13 +186,13 @@ private int checkByRate(ESDriver es, String user) {
         for (SearchHit hit : scrollResp.getHits().getHits()) {
           Map<String, Object> result = hit.getSource();
           String logtype = (String) result.get("LogType");
-          if (logtype.equals("PO.DAAC")) {
+          if (logtype.equals(MudrodConstants.HTTP_LOG)) {
             String request = (String) result.get("Request");
             matcher = pattern.matcher(request.trim().toLowerCase());
             boolean find = false;
             while (matcher.find()) {
               request = matcher.group(1);
-              result.put("RequestUrl", "http://podaac.jpl.nasa.gov" + request);
+              result.put("RequestUrl", props.getProperty(MudrodConstants.BASE_URL) + request);
               find = true;
             }
             if (!find) {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
index cd41fbe..1969e3e 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
@@ -60,9 +60,7 @@ public Object execute() {
    */
   public void generateBinaryMatrix() {
     try {
-
-      System.out.println(props.getProperty("userHistoryMatrix"));
-      File file = new File(props.getProperty("userHistoryMatrix"));
+      File file = new File(props.getProperty(MudrodConstants.USER_HISTORY_PATH));
       if (file.exists()) {
         file.delete();
       }
@@ -79,13 +77,22 @@ public void generateBinaryMatrix() {
       String[] logIndices = logIndexList.toArray(new String[0]);
       String[] statictypeArray = new String[] { this.sessionStats };
       int docCount = es.getDocCount(logIndices, statictypeArray);
+      
+      LOG.info("{}: {}", this.sessionStats, docCount);      
+
+      if (docCount==0) 
+      { 
+        bw.close(); 
+        file.delete();
+        return;
+      }
 
       SearchResponse sr = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
           .addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet();
       Terms ips = sr.getAggregations().get("IPs");
       List<String> ipList = new ArrayList<>();
       for (Terms.Bucket entry : ips.getBuckets()) {
-        if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) { // filter
+        if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) { // filter
           // out
           // less
           // active users/ips
@@ -107,7 +114,7 @@ public void generateBinaryMatrix() {
         Terms ipAgg = keyword.getAggregations().get("IPAgg");
 
         int distinctUser = ipAgg.getBuckets().size();
-        if (distinctUser > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) {
+        if (distinctUser >= Integer.parseInt(props.getProperty(MudrodConstants.QUERY_MIN))) {
           bw.write(keyword.getKey() + ",");
           for (Terms.Bucket IP : ipAgg.getBuckets()) {
 
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java
index 933b061..306ce2e 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java
@@ -188,14 +188,14 @@ public void readFileInParallel(String httplogpath, String ftplogpath) {
 
   public void importHttpfile(String httplogpath) {
     // import http logs
-    JavaRDD<String> accessLogs = spark.sc.textFile(httplogpath, this.partition).map(s -> ApacheAccessLog.parseFromLogLine(s)).filter(ApacheAccessLog::checknull);
+    JavaRDD<String> accessLogs = spark.sc.textFile(httplogpath, this.partition).map(s -> ApacheAccessLog.parseFromLogLine(s, props)).filter(ApacheAccessLog::checknull);
 
     JavaEsSpark.saveJsonToEs(accessLogs, logIndex + "/" + this.httpType);
   }
 
   public void importFtpfile(String ftplogpath) {
     // import ftp logs
-    JavaRDD<String> ftpLogs = spark.sc.textFile(ftplogpath, this.partition).map(s -> FtpLog.parseFromLogLine(s)).filter(FtpLog::checknull);
+    JavaRDD<String> ftpLogs = spark.sc.textFile(ftplogpath, this.partition).map(s -> FtpLog.parseFromLogLine(s, props)).filter(FtpLog::checknull);
 
     JavaEsSpark.saveJsonToEs(ftpLogs, logIndex + "/" + this.ftpType);
   }
@@ -263,7 +263,7 @@ public void parseSingleLineFTP(String log, String index, String type) {
       IndexRequest ir;
       try {
         ir = new IndexRequest(index, type)
-            .source(jsonBuilder().startObject().field("LogType", "ftp").field("IP", ip).field("Time", date).field("Request", request).field("Bytes", Long.parseLong(bytes)).endObject());
+            .source(jsonBuilder().startObject().field("LogType", MudrodConstants.FTP_LOG).field("IP", ip).field("Time", date).field("Request", request).field("Bytes", Long.parseLong(bytes)).endObject());
         es.getBulkProcessor().add(ir);
       } catch (NumberFormatException e) {
         LOG.error("Error whilst processing numbers", e);
@@ -306,9 +306,10 @@ public void parseSingleLineHTTP(String log, String index, String type) {
     CrawlerDetection crawlerDe = new CrawlerDetection(this.props, this.es, this.spark);
     if (!crawlerDe.checkKnownCrawler(agent)) {
       boolean tag = false;
-      String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" };
-      for (String mimeType : mimeTypes) {
-        if (request.contains(mimeType)) {
+      String[] mimeTypes = props.getProperty(MudrodConstants.BLACK_LIST_REQUEST).split(",");
+      for(String str:mimeTypes)
+      {
+        if (request.contains(str.trim())) {
           tag = true;
           break;
         }
@@ -325,7 +326,7 @@ private void executeBulkRequest(IndexRequest ir, String index, String type, Matc
     IndexRequest newIr = ir;
     try {
       newIr = new IndexRequest(index, type).source(
-          jsonBuilder().startObject().field("LogType", "PO.DAAC").field("IP", matcher.group(1)).field("Time", date).field("Request", matcher.group(5)).field("Response", matcher.group(6))
+          jsonBuilder().startObject().field("LogType", MudrodConstants.HTTP_LOG).field("IP", matcher.group(1)).field("Time", date).field("Request", matcher.group(5)).field("Response", matcher.group(6))
               .field("Bytes", Integer.parseInt(bytes)).field("Referer", matcher.group(8)).field("Browser", matcher.group(9)).endObject());
 
       es.getBulkProcessor().add(newIr);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
index 3fcc67f..f3a7f6d 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
@@ -47,17 +47,17 @@
 
   public LogAbstract(Properties props, ESDriver es, SparkDriver spark) {
     super(props, es, spark);
-    if (props != null) {
+    if (props != null && es != null) {
       initLogIndex();
     }
   }
 
   protected void initLogIndex() {
     logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + props.getProperty(MudrodConstants.TIME_SUFFIX);
-    httpType = props.getProperty(MudrodConstants.HTTP_TYPE_PREFIX);
-    ftpType = props.getProperty(MudrodConstants.FTP_TYPE_PREFIX);
-    cleanupType = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX);
-    sessionStats = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX);
+    httpType = MudrodConstants.HTTP_TYPE;
+    ftpType = MudrodConstants.FTP_TYPE;
+    cleanupType = MudrodConstants.CLEANUP_TYPE;
+    sessionStats = MudrodConstants.SESSION_STATS_TYPE;
 
     InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS);
     InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
index d884bf9..4e170d7 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionGenerator.java
@@ -75,11 +75,11 @@ public Object execute() {
   public void generateSession() {
     try {
       es.createBulkProcessor();
-      genSessionByReferer(Integer.parseInt(props.getProperty("timegap")));
+      genSessionByReferer(Integer.parseInt(props.getProperty(MudrodConstants.REQUEST_TIME_GAP)));
       es.destroyBulkProcessor();
 
       es.createBulkProcessor();
-      combineShortSessions(Integer.parseInt(props.getProperty("timegap")));
+      combineShortSessions(Integer.parseInt(props.getProperty(MudrodConstants.REQUEST_TIME_GAP)));
       es.destroyBulkProcessor();
     } catch (ElasticsearchException e) {
       LOG.error("Error whilst executing bulk processor.", e);
@@ -241,7 +241,7 @@ public int genSessionByReferer(ESDriver es, String user, int timeThres) throws E
     String logType = "";
     String id = "";
     String ip = user;
-    String indexUrl = "http://podaac.jpl.nasa.gov/";
+    String indexUrl = props.getProperty(MudrodConstants.BASE_URL) + "/";
     DateTime time = null;
     DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
 
@@ -254,7 +254,7 @@ public int genSessionByReferer(ESDriver es, String user, int timeThres) throws E
         time = fmt.parseDateTime((String) result.get("Time"));
         id = hit.getId();
 
-        if ("PO.DAAC".equals(logType)) {
+        if (MudrodConstants.HTTP_LOG.equals(logType)) {
           if ("-".equals(referer) || referer.equals(indexUrl) || !referer.contains(indexUrl)) {
             sessionCountIn++;
             sessionReqs.put(ip + "@" + sessionCountIn, new HashMap<String, DateTime>());
@@ -313,7 +313,7 @@ public int genSessionByReferer(ESDriver es, String user, int timeThres) throws E
               }
             }
           }
-        } else if ("ftp".equals(logType)) {
+        } else if (MudrodConstants.FTP_LOG.equals(logType)) {
 
           // may affect computation efficiency
           Map<String, DateTime> requests = sessionReqs.get(ip + "@" + sessionCountIn);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
index f084a90..3cee9c7 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/SessionStatistic.java
@@ -111,7 +111,7 @@ public void processSessionInSequential() throws IOException, InterruptedExceptio
    * @return dataset ID
    */
   public String findDataset(String request) {
-    String pattern1 = "/dataset/";
+    String pattern1 = props.getProperty(MudrodConstants.VIEW_MARKER);
     String pattern2;
     if (request.contains("?")) {
       pattern2 = "?";
@@ -221,14 +221,14 @@ public int processSession(ESDriver es, String sessionId) throws IOException, Int
           request = matcher.group(1);
         }
 
-        String datasetlist = "/datasetlist?";
-        String dataset = "/dataset/";
+        String datasetlist = props.getProperty(MudrodConstants.SEARCH_MARKER);
+        String dataset = props.getProperty(MudrodConstants.VIEW_MARKER);
         if (request.contains(datasetlist)) {
           searchDataListRequestCount++;
 
           RequestUrl requestURL = new RequestUrl();
           String infoStr = requestURL.getSearchInfo(request) + ",";
-          String info = es.customAnalyzing(props.getProperty("indexName"), infoStr);
+          String info = es.customAnalyzing(props.getProperty(MudrodConstants.ES_INDEX_NAME), infoStr);
 
           if (!",".equals(info)) {
             if ("".equals(keywords)) {
@@ -249,19 +249,13 @@ public int processSession(ESDriver es, String sessionId) throws IOException, Int
           searchDataRequestCount++;
           if (findDataset(request) != null) {
             String view = findDataset(request);
-
-            if ("".equals(views)) {
+            if ("".equals(views)) 
               views = view;
-            } else {
-              if (views.contains(view)) {
-
-              } else {
-                views = views + "," + view;
-              }
-            }
-          }
+             else if (!views.contains(view)) 
+                views = views + "," + view;          
+           }          
         }
-        if ("ftp".equals(logType)) {
+        if (MudrodConstants.FTP_LOG.equals(logType)) {
           ftpRequestCount++;
           String download = "";
           String requestLowercase = request.toLowerCase();
@@ -294,12 +288,13 @@ public int processSession(ESDriver es, String sessionId) throws IOException, Int
       keywordsNum = keywords.split(",").length;
     }
 
-    if (searchDataListRequestCount != 0
-            && searchDataListRequestCount <= Integer.parseInt(props.getProperty("searchf"))
-            && searchDataRequestCount != 0
-            && searchDataRequestCount <= Integer.parseInt(props.getProperty("viewf"))
-            && ftpRequestCount <= Integer.parseInt(props.getProperty("downloadf"))) {
-      String sessionURL = props.getProperty("SessionPort") + props.getProperty("SessionUrl") + "?sessionid=" + sessionId + "&sessionType=" + outputType + "&requestType=" + inputType;
+    if (searchDataListRequestCount != 0 && 
+        searchDataListRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.SEARCH_F)) && 
+        searchDataRequestCount != 0 && 
+        searchDataRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.VIEW_F)) && 
+        ftpRequestCount <= Integer.parseInt(props.getProperty(MudrodConstants.DOWNLOAD_F))) 
+    {
+      String sessionURL = props.getProperty(MudrodConstants.SESSION_PORT) + props.getProperty(MudrodConstants.SESSION_URL) + "?sessionid=" + sessionId + "&sessionType=" + outputType + "&requestType=" + inputType;
       sessionCount = 1;
 
       IndexRequest ir = new IndexRequest(logIndex, outputType).source(
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
index 68fad4d..70c4067 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/ClickStreamAnalyzer.java
@@ -16,6 +16,7 @@
 import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
 import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.apache.sdap.mudrod.semantics.SVDAnalyzer;
 import org.apache.sdap.mudrod.ssearch.ClickstreamImporter;
 import org.apache.sdap.mudrod.utils.LinkageTriple;
@@ -50,13 +51,15 @@ public Object execute() {
     LOG.info("Starting ClickStreamAnalyzer...");
     startTime = System.currentTimeMillis();
     try {
-      String clickstream_matrixFile = props.getProperty("clickstreamMatrix");
+      String clickstream_matrixFile = props.getProperty(MudrodConstants.CLICKSTREAM_PATH);
       File f = new File(clickstream_matrixFile);
       if (f.exists()) {
         SVDAnalyzer svd = new SVDAnalyzer(props, es, spark);
-        svd.getSVDMatrix(props.getProperty("clickstreamMatrix"), Integer.parseInt(props.getProperty("clickstreamSVDDimension")), props.getProperty("clickstreamSVDMatrix_tmp"));
-        List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty("clickstreamSVDMatrix_tmp"));
-        svd.saveToES(tripleList, props.getProperty("indexName"), props.getProperty("clickStreamLinkageType"));
+        svd.getSVDMatrix(clickstream_matrixFile, 
+            Integer.parseInt(props.getProperty(MudrodConstants.CLICKSTREAM_SVD_DIM)), 
+            props.getProperty(MudrodConstants.CLICKSTREAM_SVD_PATH));
+        List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty(MudrodConstants.CLICKSTREAM_SVD_PATH));
+        svd.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), MudrodConstants.CLICK_STREAM_LINKAGE_TYPE);
       
         // Store click stream in ES for the ranking use
         ClickstreamImporter cs = new ClickstreamImporter(props, es, spark);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
index d95475c..c4c5da4 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/process/UserHistoryAnalyzer.java
@@ -49,7 +49,7 @@ public Object execute() {
     startTime = System.currentTimeMillis();
 
     SemanticAnalyzer sa = new SemanticAnalyzer(props, es, spark);
-    List<LinkageTriple> tripleList = sa.calTermSimfromMatrix(props.getProperty("userHistoryMatrix"));
+    List<LinkageTriple> tripleList = sa.calTermSimfromMatrix(props.getProperty(MudrodConstants.USER_HISTORY_PATH));
     sa.saveToES(tripleList, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.USE_HISTORY_LINKAGE_TYPE));
 
     endTime = System.currentTimeMillis();
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java
index 985b2d7..050d19d 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/ApacheAccessLog.java
@@ -15,35 +15,26 @@
 
 import com.google.gson.Gson;
 
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.weblog.pre.CrawlerDetection;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.sdap.mudrod.weblog.pre.CrawlerDetection;
-
 /**
  * This class represents an Apache access log line. See
  * http://httpd.apache.org/docs/2.2/logs.html for more details.
  */
 public class ApacheAccessLog extends WebLog implements Serializable {
-
-
-  /**
-   * 
-   */
-  private static final long serialVersionUID = 1L;
-
-  public ApacheAccessLog() {
-    //default constructor
-  }
-
-  String response;
-  String referer;
-  String browser;
+  String Response;
+  String Referer;
+  String Browser;
 
   @Override
   public double getBytes() {
@@ -51,19 +42,22 @@ public double getBytes() {
   }
 
   public String getBrowser() {
-    return this.browser;
+    return this.Browser;
   }
 
   public String getResponse() {
-    return this.response;
+    return this.Response;
   }
 
   public String getReferer() {
-    return this.referer;
+    return this.Referer;
   }
 
+  public ApacheAccessLog() {
+	  super();
+  }
 
-  public static String parseFromLogLine(String log) throws IOException, ParseException {
+  public static String parseFromLogLine(String log, Properties props) throws IOException, ParseException {
 
     String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" \"([^\"]+)\"";
     final int numFields = 9;
@@ -89,12 +83,12 @@ public static String parseFromLogLine(String log) throws IOException, ParseExcep
 
     String request = matcher.group(5).toLowerCase();
     String agent = matcher.group(9);
-    CrawlerDetection crawlerDe = new CrawlerDetection();
+    CrawlerDetection crawlerDe = new CrawlerDetection(props);
     if (crawlerDe.checkKnownCrawler(agent)) {
       return lineJson;
     } else {
 
-      String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" };
+      String[] mimeTypes = props.getProperty(MudrodConstants.BLACK_LIST_REQUEST).split(",");
       for (String mimeType : mimeTypes) {
         if (request.contains(mimeType)) {
           return lineJson;
@@ -102,13 +96,13 @@ public static String parseFromLogLine(String log) throws IOException, ParseExcep
       }
 
       ApacheAccessLog accesslog = new ApacheAccessLog();
-      accesslog.LogType = "PO.DAAC";
+      accesslog.LogType = MudrodConstants.HTTP_LOG;
       accesslog.IP = matcher.group(1);
       accesslog.Request = matcher.group(5);
-      accesslog.response = matcher.group(6);
+      accesslog.Response = matcher.group(6);
       accesslog.Bytes = Double.parseDouble(bytes);
-      accesslog.referer = matcher.group(8);
-      accesslog.browser = matcher.group(9);
+      accesslog.Referer = matcher.group(8);
+      accesslog.Browser = matcher.group(9);
       SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.sss'Z'");
       accesslog.Time = df.format(date);
 
@@ -116,7 +110,7 @@ public static String parseFromLogLine(String log) throws IOException, ParseExcep
       lineJson = gson.toJson(accesslog);
 
       return lineJson;
-    }
+  }
   }
 
   public static boolean checknull(WebLog s) {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java
index 488fe52..9f39655 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/FtpLog.java
@@ -15,6 +15,7 @@
 
 import com.google.gson.Gson;
 
+import org.apache.sdap.mudrod.main.MudrodConstants;
 import org.apache.sdap.mudrod.weblog.pre.ImportLogFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -22,6 +23,7 @@
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Properties;
 
 /**
  * This class represents an FTP access log line.
@@ -29,8 +31,12 @@
 public class FtpLog extends WebLog implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(ImportLogFile.class);
+  
+  public FtpLog() {
+	  super();
+  }
 
-  public static String parseFromLogLine(String log) {
+  public static String parseFromLogLine(String log, Properties props) {
 
     try {
       String ip = log.split(" +")[6];
@@ -47,7 +53,7 @@ public static String parseFromLogLine(String log) {
 
       if (!request.contains("/misc/") && !request.contains("readme")) {
         FtpLog ftplog = new FtpLog();
-        ftplog.LogType = "ftp";
+        ftplog.LogType = MudrodConstants.FTP_LOG;
         ftplog.IP = ip;
         ftplog.Request = request;
         ftplog.Bytes = Double.parseDouble(bytes);
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java
index f86438d..05a3395 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/RequestUrl.java
@@ -117,7 +117,7 @@ private static String truncateUrlPage(String strURL) {
   }
 
   /**
-   * GetSearchInfo: Get search information from url link
+   * GetSearchInfo: Get search information/keywords from url link
    *
    * @param URL request url
    * @return search params
@@ -186,35 +186,6 @@ public String getSearchInfo(String URL) throws UnsupportedEncodingException {
     return String.join(",", info);
   }
 
-  /**
-   * GetSearchWord: Get search words from url link
-   *
-   * @param url request url
-   * @return query
-   */
-  public static String getSearchWord(String url) {
-    String keyword = "";
-
-    Map<String, String> mapRequest = RequestUrl.uRLRequest(url);
-    if (mapRequest.get("search") != null) {
-      try {
-        keyword = mapRequest.get("search");
-
-        keyword = URLDecoder.decode(keyword.replaceAll("%(?![0-9a-fA-F]{2})", "%25"), "UTF-8");
-        if (keyword.contains("%2b") || keyword.contains("%20") || keyword.contains("%25")) {
-          keyword = keyword.replace("%2b", " ");
-          keyword = keyword.replace("%20", " ");
-          keyword = keyword.replace("%25", " ");
-        }
-        keyword = keyword.replaceAll("[-+^:,*_\"]", " ").replace("\\", " ").replaceAll("\\s+", " ").trim();
-      } catch (UnsupportedEncodingException e) {
-        LOG.error(mapRequest.get("search"));
-        e.printStackTrace();
-      }
-    }
-
-    return keyword;
-  }
 
   /**
    * GetFilterInfo: Get filter params from url link
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java
index f11efc6..31bef0c 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/Session.java
@@ -16,8 +16,9 @@
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-
 import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
@@ -214,7 +215,7 @@ private SessionTree getSessionTree(String indexName, String type, String session
       String logType = (String) result.get("LogType");
       String referer = (String) result.get("Referer");
 
-      SessionNode node = new SessionNode(request, logType, referer, time, seq);
+      SessionNode node = new SessionNode(request, logType, referer, props.getProperty(MudrodConstants.BASE_URL), time, seq);
       tree.insert(node);
       seq++;
     }
@@ -231,7 +232,7 @@ private SessionTree getSessionTree(String indexName, String type, String session
    * @throws UnsupportedEncodingException UnsupportedEncodingException
    */
   private JsonElement getRequests(String cleanuptype, String sessionID) throws UnsupportedEncodingException {
-    SearchResponse response = es.getClient().prepareSearch(props.getProperty("indexName")).setTypes(cleanuptype).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100)
+    SearchResponse response = es.getClient().prepareSearch(props.getProperty(MudrodConstants.ES_INDEX_NAME)).setTypes(cleanuptype).setQuery(QueryBuilders.termQuery("SessionID", sessionID)).setSize(100)
         .addSort("Time", SortOrder.ASC).execute().actionGet();
 
     Gson gson = new Gson();
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionExtractor.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionExtractor.java
index 6adaf97..85b6961 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionExtractor.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionExtractor.java
@@ -247,17 +247,17 @@ public SessionExtractor() {
    */
   protected List<String> getSessions(Properties props, ESDriver es, String logIndex) {
 
-    String cleanupPrefix = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX);
-    String sessionStatPrefix = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX);
+    String cleanupType = MudrodConstants.CLEANUP_TYPE;
+    String sessionStatType = MudrodConstants.SESSION_STATS_TYPE;
 
     List<String> sessions = new ArrayList<>();
-    SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(sessionStatPrefix).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+    SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(sessionStatType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
             .actionGet();
     while (true) {
       for (SearchHit hit : scrollResp.getHits().getHits()) {
         Map<String, Object> session = hit.getSource();
         String sessionID = (String) session.get("SessionID");
-        sessions.add(sessionID + "," + logIndex + "," + cleanupPrefix);
+        sessions.add(sessionID + "," + logIndex + "," + cleanupType);
       }
 
       scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
@@ -381,7 +381,7 @@ public Boolean call(Tuple2<String, Double> arg0) throws Exception {
     List<String> result = new ArrayList<>();
     List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
     for (String logIndex : logIndexList) {
-      SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(props.getProperty(MudrodConstants.SESSION_STATS_PREFIX)).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery())
+      SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(MudrodConstants.SESSION_STATS_TYPE).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery())
               .setSize(100).execute().actionGet();
       while (true) {
         for (SearchHit hit : scrollResp.getHits().getHits()) {
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionNode.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionNode.java
index 5e43f3e..6378615 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionNode.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionNode.java
@@ -65,12 +65,12 @@ public SessionNode() {
    * @param time:    request time of node
    * @param seq:     sequence of this node
    */
-  public SessionNode(String request, String logType, String referer, String time, int seq) {
+  public SessionNode(String request, String logType, String referer, String basicUrl, String time, int seq) {
     this.logType = logType;
     this.time = time;
     this.seq = seq;
     this.setRequest(request);
-    this.setReferer(referer);
+    this.setReferer(referer, basicUrl);
     this.setKey(request, logType);
   }
 
@@ -79,12 +79,12 @@ public SessionNode(String request, String logType, String referer, String time,
    *
    * @param referer previous request url
    */
-  public void setReferer(String referer) {
+  public void setReferer(String referer, String basicUrl) {
     if (referer == null) {
       this.referer = "";
       return;
     }
-    this.referer = referer.toLowerCase().replace("http://podaac.jpl.nasa.gov", "");
+	this.referer= referer.toLowerCase().replace(basicUrl, "");
   }
 
   /**
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionTree.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionTree.java
index ac547dc..7d31129 100644
--- a/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionTree.java
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/structure/SessionTree.java
@@ -19,6 +19,8 @@
 
 import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract;
 import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +68,7 @@
    */
   public SessionTree(Properties props, ESDriver es, SessionNode rootData, String sessionID, String cleanupType) {
     super(props, es, null);
-    root = new SessionNode("root", "root", "", "", 0);
+    root = new SessionNode("root", "root", "", props.getProperty(MudrodConstants.BASE_URL), "", 0);
     tmpnode = root;
     this.sessionID = sessionID;
     this.cleanupType = cleanupType;
@@ -82,7 +84,7 @@ public SessionTree(Properties props, ESDriver es, SessionNode rootData, String s
    */
   public SessionTree(Properties props, ESDriver es, String sessionID, String cleanupType) {
     super(props, es, null);
-    root = new SessionNode("root", "root", "", "", 0);
+    root = new SessionNode("root", "root", "", props.getProperty(MudrodConstants.BASE_URL), "", 0);
     root.setParent(root);
     tmpnode = root;
     this.sessionID = sessionID;
@@ -97,16 +99,16 @@ public SessionTree(Properties props, ESDriver es, String sessionID, String clean
    */
   public SessionNode insert(SessionNode node) {
     // begin with datasetlist
-    if ("datasetlist".equals(node.getKey())) {
+    if (props.getProperty(MudrodConstants.SEARCH_MARKER).equals(node.getKey())) {
       this.binsert = true;
     }
     if (!this.binsert) {
       return null;
     }
     // remove unrelated node
-    if (!"datasetlist".equals(node.getKey()) &&
-            !"dataset".equals(node.getKey()) &&
-            !"ftp".equals(node.getKey())) {
+    if (!props.getProperty(MudrodConstants.SEARCH_MARKER).equals(node.getKey()) &&
+            !props.getProperty(MudrodConstants.VIEW_MARKER).equals(node.getKey()) &&
+            !MudrodConstants.FTP_LOG.equals(node.getKey())) {
       return null;
     }
     // remove dumplicated click
@@ -204,7 +206,7 @@ public JsonObject treeToJson(SessionNode node) {
       String viewquery = "";
       try {
         String infoStr = requestURL.getSearchInfo(viewnode.getRequest());
-        viewquery = es.customAnalyzing(props.getProperty("indexName"), infoStr);
+        viewquery = es.customAnalyzing(props.getProperty(MudrodConstants.ES_INDEX_NAME), infoStr);
       } catch (UnsupportedEncodingException | InterruptedException | ExecutionException e) {
         LOG.warn("Exception getting search info. Ignoring...", e);
       }
@@ -487,7 +489,7 @@ private boolean insertHelper(SessionNode entry, SessionNode node) {
         String infoStr = requestURL.getSearchInfo(queryUrl);
         String query = null;
         try {
-          query = es.customAnalyzing(props.getProperty("indexName"), infoStr);
+          query = es.customAnalyzing(props.getProperty(MudrodConstants.ES_INDEX_NAME), infoStr);
         } catch (InterruptedException | ExecutionException e) {
           throw new RuntimeException("Error performing custom analyzing", e);
         }
diff --git a/core/src/main/resources/config.properties b/core/src/main/resources/config.properties
new file mode 100644
index 0000000..4c8991e
--- /dev/null
+++ b/core/src/main/resources/config.properties
@@ -0,0 +1,74 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you 
+# may not use this file except in compliance with the License. 
+# You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Database configuration
+mudrod.cluster.name=MudrodES
+mudrod.es.transport.tcp.port = 9300
+mudrod.es.unicast.hosts = 127.0.0.1
+mudrod.es.http.port = 9200
+mudrod.es.index = mudrod
+    
+# Spark related
+# Log processing type. Possible values include 'sequential' or 'parallel'
+mudrod.processing.type = parallel
+mudrod.spark.app.name = MudrodSparkApp
+mudrod.spark.master = local[4]
+mudrod.spark.optimize = repartition
+    
+# Web log processing configuration
+# index name has to be all lowercase
+mudrod.log.index = log
+mudrod.ftp.prefix = FTP.
+mudrod.http.prefix = WWW.
+mudrod.base.url = http://podaac.jpl.nasa.gov
+mudrod.black.request.list = .js, .css, .jpg, .png, .ico, image_captcha, autocomplete, .gif, /alldata/, /api/, get / http/1.1, .jpeg, /ws/
+mudrod.black.agent.list = crawler, googlebot, bingbot, slurp, yacybot, rogerbot, yandexbot, -, apache-httpclient, java, curl
+mudrod.search.freq = 100
+mudrod.view.freq = 200
+mudrod.download.freq = 100
+mudrod.request.rate = 30
+mudrod.session.port = 8080
+mudrod.session.url = /mudrod-service/session.html
+mudrod.request.time.gap = 600   
+mudrod.view.url.marker = /dataset/
+mudrod.search.url.marker = /datasetlist?
+# In order to better parse a URL (getting searching keyword, etc.), please consider custimize 
+# org.apache.sdap.mudrod.weblog.structure.RequestUrl - GetSearchInfo, getFilterInfo
+	
+# User search history
+mudrod.query.min = 0
+mudrod.user.history.weight = 2
+	
+# clickstream
+mudrod.download.weight = 3
+mudrod.clickstream.svd.d = 50
+mudrod.clickstream.weight = 2
+			 	
+# metadata
+mudrod.metadata.download = 0
+mudrod.metadata.download.url = https://podaac.jpl.nasa.gov/api/dataset?startIndex=$startIndex&amp;entries=10&amp;sortField=Dataset-AllTimePopularity&amp;sortOrder=asc&amp;id=&amp;value=&amp;search=
+mudrod.metadata.svd.d = 50
+mudrod.metadata.url = null
+mudrod.metadata.weight = 1
+mudrod.metadata.type = RawMetadata
+       	
+# ranking, ${svmSgdModel.value} is resolved at build time. See the property in core/pom.xml for the value
+mudrod.ranking.machine.learning = 1
+mudrod.ranking.model = ${svmSgdModel.value}.zip
+       	
+# recommendation
+mudrod.metadata.id = Dataset-ShortName
+mudrod.metadata.semantic.fields = DatasetParameter-Term,DatasetParameter-Variable,Dataset-ExtractTerm
+
+# ontology service implementation. Possible values include EsipPortal - EsipPortalOntology EsipCOR - EsipCOROntology Local - org.apache.sdap.mudrod.ontology.process.Local
+mudrod.ontology.implementation = Local
+mudrod.ontology.weight = 2
diff --git a/core/src/main/resources/config.xml b/core/src/main/resources/config.xml
deleted file mode 100644
index 5a5bcee..0000000
--- a/core/src/main/resources/config.xml
+++ /dev/null
@@ -1,129 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- 
-  Licensed under the Apache License, Version 2.0 (the "License"); 
-  you may not use this file except in compliance with the License. 
-  You may obtain  a copy of the License at 
-  
-  http://www.apache.org/licenses/LICENSE-2.0 Unless 
-  
-  required by applicable law or agreed to in writing, software 
-  distributed under the License is distributed on an "AS IS" 
-  BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
-  express or implied. See the License for the specific language 
-  governing permissions and limitations under the License. 
--->
-<Config>
-    <para name="Cleanup_type_prefix">cleanupLog</para>
-
-    <para name="clickStreamLinkageType">ClickStreamLinkage</para>
-
-    <para name="clickStreamMatrixType">clickstreamMatrix</para>
-
-    <para name="clickstreamSVDDimension">50</para>
-
-    <para name="clickStream_w">2</para>
-
-    <para name="commentType">comment</para>
-
-    <para name="downloadf">100</para>
-
-    <para name="downloadWeight">3</para>
-
-    <para name="clusterName">MudrodES</para>
-
-    <para name="ES_Transport_TCP_Port">9300</para>
-
-    <para name="ES_unicast_hosts">127.0.0.1</para>
-
-    <para name="ES_HTTP_port">9200</para>
-
-    <para name="indexName">mudrod</para>
-
-    <para name="ftpPrefix">FTP.</para>
-    
-    <para name="FTP_type_prefix">rawftp</para>
-
-    <para name="HTTP_type_prefix">rawhttp</para>
-
-    <para name="httpPrefix">WWW.</para>
-
-    <para name="logIndexName">podaaclog</para>
-
-    <para name="metadataLinkageType">MetadataLinkage</para>
-
-    <para name="metadataSVDDimension">50</para>
-
-    <para name="metadataurl">null</para>
-
-    <para name="metadata_w">1</para>
-
-    <para name="mini_userHistory">5</para>
-
-    <!--
-    The ontology service implementation. Possible values include
-     EsipPortal - EsipPortalOntology
-     EsipCOR - EsipCOROntology
-     Local - org.apache.sdap.mudrod.ontology.process.Local
-     -->
-    <para name="mudrod.ontology.implementation">Local</para>
-
-    <para name="ontologyLinkageType">SWEETLinkage</para>
-
-    <para name="ontology_w">2</para>
-
-    <!--
-     Log processing type. Possible values include
-     'sequential' or 'parallel'.
-     -->
-    <para name="processingType">parallel</para>
-
-    <para name="raw_metadataType">RawMetadata</para>
-
-    <para name="searchf">100</para>
-
-    <para name="sendingrate">30</para>
-
-    <para name="SessionPort">8080</para>
-
-    <para name="SessionStats_prefix">sessionstats</para>
-
-    <para name="SessionUrl">/mudrod-service/session.html</para>
-
-    <!-- The name of your application. This will appear in the UI and in log data.-->
-    <para name="spark.app.name">MudrodSparkApp</para>
-
-    <!--
-    The default Spark cluster manager to connect to. See the list of allowed master URL's.
-    For more information, consult http://spark.apache.org/docs/latest/submitting-applications.html#master-urls
-    -->
-    <para name="spark.master">local[4]</para>
-
-    <!-- ${svmSgdModel.value} is resolved at build time. See the property in core/pom.xml for the value -->
-    <para name="svmSgdModel">${svmSgdModel.value}.zip</para>
-    
-    <para name="timegap">600</para>
-    
-    <para name="userHistoryLinkageType">UserHistoryLinkage</para>
-    
-    <para name="userHistory_w">2</para>
-
-    <para name="viewf">200</para>
-
-
-
-    <!-- FOLLOWING NEEDS TO BE ADDED TO MudrodConstants.java -->
-    <para name="recom_metadataType">RecomMetadata</para>
-    <!-- recommendation -->
-    <para name="metadataTermTFIDFSimType">MetadataTermTFIDFSim</para>
-    <para name="metadataWordTFIDFSimType">MetadataWordTFIDFSim</para>
-    <para name="metadataCodeSimType">MetadataCodeSim</para>
-    <para name="metadataSessionBasedSimType">MetadataSBSim</para>
-    <para name="metadataTopicSimType">MetadataTBSim</para>
-    <!--
-    Log processing parallel optimization type. Possible values include
-    default - MudrodConstants.PARALLEL_OPTIMIZATION_DEFAULT
-    repartition - MudrodConstants.PARALLEL_OPTIMIZATION_REPARTITION
-    -->
-    <para name="parallelOptimization">repartition</para>
-
-</Config>
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/recommendation/RecomDatasetsResource.java b/service/src/main/java/gov/nasa/jpl/mudrod/services/recommendation/RecomDatasetsResource.java
deleted file mode 100644
index 072ba5e..0000000
--- a/service/src/main/java/gov/nasa/jpl/mudrod/services/recommendation/RecomDatasetsResource.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sdap.mudrod.services.recommendation;
-
-import com.google.gson.JsonObject;
-
-import javax.servlet.ServletContext;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.sdap.mudrod.main.MudrodEngine;
-import org.apache.sdap.mudrod.recommendation.structure.RecomData;
-
-/**
- * A Dataset recommendation resource.
- */
-@Path("/recommendation")
-public class RecomDatasetsResource {
-
-  private MudrodEngine mEngine;
-
-  public RecomDatasetsResource(@Context ServletContext sc) {
-    this.mEngine = (MudrodEngine) sc.getAttribute("MudrodInstance");
-  }
-
-  @GET
-  @Path("/status")
-  @Produces("text/html")
-  public Response status() {
-    return Response.ok("<h1>This is MUDROD Recommendation Datasets Resource: running correctly...</h1>").build();
-  }
-
-  @PUT
-  @Path("{shortname}")
-  @Produces(MediaType.APPLICATION_JSON)
-  @Consumes("text/plain")
-  public Response hybridRecommendation(@PathParam("shortname") String shortName) {
-    JsonObject json = new JsonObject();
-    if (shortName != null) {
-      RecomData recom = new RecomData(mEngine.getConfig(), mEngine.getESDriver(), null);
-      json = new JsonObject();
-      json.add("RecommendationData", recom.getRecomDataInJson(shortName, 10));
-    }
-    return Response.ok(json.toString(), MediaType.APPLICATION_JSON).build();
-  }
-}
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/DefaultExceptionMapper.java b/service/src/main/java/org/apache/sdap/mudrod/services/DefaultExceptionMapper.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/DefaultExceptionMapper.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/DefaultExceptionMapper.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/MudrodContextListener.java b/service/src/main/java/org/apache/sdap/mudrod/services/MudrodContextListener.java
similarity index 94%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/MudrodContextListener.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/MudrodContextListener.java
index e08bf78..72130d3 100644
--- a/service/src/main/java/gov/nasa/jpl/mudrod/services/MudrodContextListener.java
+++ b/service/src/main/java/org/apache/sdap/mudrod/services/MudrodContextListener.java
@@ -63,12 +63,10 @@ public void contextInitialized(ServletContextEvent arg0) {
 
     ServletContext ctx = arg0.getServletContext();
     Searcher searcher = new Searcher(props, me.getESDriver(), null);
-    Ranker ranker = new Ranker(props, me.getESDriver(), me.getSparkDriver(), "SparkSVM");
-    Ontology ontImpl = new OntologyFactory(props).getOntology();
+    Ranker ranker = new Ranker(props, me.getESDriver(), me.getSparkDriver());
     ctx.setAttribute("MudrodInstance", me);
     ctx.setAttribute("MudrodSearcher", searcher);
     ctx.setAttribute("MudrodRanker", ranker);
-    ctx.setAttribute("Ontology", ontImpl);
   }
 
 }
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/autocomplete/AutoCompleteData.java b/service/src/main/java/org/apache/sdap/mudrod/services/autocomplete/AutoCompleteData.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/autocomplete/AutoCompleteData.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/autocomplete/AutoCompleteData.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/autocomplete/AutoCompleteResource.java b/service/src/main/java/org/apache/sdap/mudrod/services/autocomplete/AutoCompleteResource.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/autocomplete/AutoCompleteResource.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/autocomplete/AutoCompleteResource.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/autocomplete/package-info.java b/service/src/main/java/org/apache/sdap/mudrod/services/autocomplete/package-info.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/autocomplete/package-info.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/autocomplete/package-info.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/ontology/OntologyResource.java b/service/src/main/java/org/apache/sdap/mudrod/services/ontology/OntologyResource.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/ontology/OntologyResource.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/ontology/OntologyResource.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/ontology/package-info.java b/service/src/main/java/org/apache/sdap/mudrod/services/ontology/package-info.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/ontology/package-info.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/ontology/package-info.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/package-info.java b/service/src/main/java/org/apache/sdap/mudrod/services/package-info.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/package-info.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/package-info.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/recommendation/HybridRecomDatasetsResource.java b/service/src/main/java/org/apache/sdap/mudrod/services/recommendation/HybridRecomDatasetsResource.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/recommendation/HybridRecomDatasetsResource.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/recommendation/HybridRecomDatasetsResource.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/recommendation/package-info.java b/service/src/main/java/org/apache/sdap/mudrod/services/recommendation/package-info.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/recommendation/package-info.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/recommendation/package-info.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/search/SearchDatasetDetailResource.java b/service/src/main/java/org/apache/sdap/mudrod/services/search/SearchDatasetDetailResource.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/search/SearchDatasetDetailResource.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/search/SearchDatasetDetailResource.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/search/SearchMetadataResource.java b/service/src/main/java/org/apache/sdap/mudrod/services/search/SearchMetadataResource.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/search/SearchMetadataResource.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/search/SearchMetadataResource.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/search/SearchVocabResource.java b/service/src/main/java/org/apache/sdap/mudrod/services/search/SearchVocabResource.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/search/SearchVocabResource.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/search/SearchVocabResource.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/search/SessionDetailResource.java b/service/src/main/java/org/apache/sdap/mudrod/services/search/SessionDetailResource.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/search/SessionDetailResource.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/search/SessionDetailResource.java
diff --git a/service/src/main/java/gov/nasa/jpl/mudrod/services/search/package-info.java b/service/src/main/java/org/apache/sdap/mudrod/services/search/package-info.java
similarity index 100%
rename from service/src/main/java/gov/nasa/jpl/mudrod/services/search/package-info.java
rename to service/src/main/java/org/apache/sdap/mudrod/services/search/package-info.java
diff --git a/service/src/main/webapp/WEB-INF/web.xml b/service/src/main/webapp/WEB-INF/web.xml
index 195ef01..e62e668 100644
--- a/service/src/main/webapp/WEB-INF/web.xml
+++ b/service/src/main/webapp/WEB-INF/web.xml
@@ -28,7 +28,6 @@
                 org.apache.sdap.mudrod.services.autocomplete.AutoCompleteResource,
                 org.apache.sdap.mudrod.services.search.SearchDatasetDetailResource,
                 org.apache.sdap.mudrod.services.recommendation.HybridRecomDatasetsResource,
-                org.apache.sdap.mudrod.services.recommendation.RecomDatasetsResource,
                 org.apache.sdap.mudrod.services.search.SearchMetadataResource,
                 org.apache.sdap.mudrod.services.search.SearchVocabResource,
                 org.apache.sdap.mudrod.services.search.SessionDetailResource,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services