You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:34:59 UTC

[18/21] incubator-sdap-mudrod git commit: SDAP-1 Import all code under the SDAP SGA

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/LocalOntology.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/LocalOntology.java b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/LocalOntology.java
new file mode 100644
index 0000000..55ca51d
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/LocalOntology.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.ontology.process;
+
+import gov.nasa.jpl.mudrod.ontology.Ontology;
+
+import org.apache.jena.ontology.Individual;
+import org.apache.jena.ontology.OntClass;
+import org.apache.jena.ontology.OntModel;
+import org.apache.jena.ontology.OntModelSpec;
+import org.apache.jena.ontology.OntResource;
+import org.apache.jena.ontology.Restriction;
+import org.apache.jena.rdf.model.AnonId;
+import org.apache.jena.rdf.model.Literal;
+import org.apache.jena.rdf.model.ModelFactory;
+import org.apache.jena.rdf.model.Resource;
+import org.apache.jena.shared.PrefixMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The LocalOntology implementation enables us to work with Ontology files
+ * whcih are cached locally and available on the runtime classpath e.g.
+ * in <code>src/main/resource/ontology/...</code>.
+ * From here we can test and iterate on how use of ontology can enhance search.
+ */
+public class LocalOntology implements Ontology {
+
+  public static final Logger LOG = LoggerFactory.getLogger(LocalOntology.class);
+
+  public static final String DELIMITER_SEARCHTERM = " ";
+
+  private Map<Object, Object> searchTerms = new HashMap<>();
+  private static OntologyParser parser;
+  private static OntModel ontologyModel;
+  private Ontology ontology;
+  private static Map<AnonId, String> mAnonIDs = new HashMap<>();
+  private static int mAnonCount = 0;
+  private List<String> ontArrayList;
+
+  public LocalOntology() {
+    //only initialize all the static variables
+    //if first time called to this ontology constructor
+    if (ontology == null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Creating new ontology");
+      }
+      parser = new OwlParser();
+      ontology = this;
+    }
+    if (ontologyModel == null)
+      ontologyModel = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM, null);
+    load();
+  }
+
+  /**
+   * Static accessor for {@link LocalOntology}
+   * instance implementation defined within <code>config.xml</code>.
+   *
+   * @return a {@link LocalOntology}
+   */
+  public Ontology getInstance() {
+    if (ontology == null) {
+      ontology = new LocalOntology();
+    }
+    return ontology;
+  }
+
+  /**
+   * Load the default <i>sweetAll.owl</i> ontology
+   * from <a href="https://raw.githubusercontent.com/ESIPFed/sweet/master/2.4/sweetAll.owl">
+   * https://raw.githubusercontent.com/ESIPFed/sweet/master/2.4/sweetAll.owl</a>
+   */
+  @Override
+  public void load() {
+    URL ontURL = null;
+    try {
+      ontURL = new URL("https://raw.githubusercontent.com/ESIPFed/sweet/master/2.4/sweetAll.owl");
+      //ontURL = new URL("https://raw.githubusercontent.com/ESIPFed/sweet/master/2.4/reprDataProduct.owl");
+    } catch (MalformedURLException e) {
+      LOG.error("Error when attempting to create URL resource: ", e);
+    }
+    ontArrayList = new ArrayList<>();
+    try {
+      ontArrayList.add(ontURL.toURI().toString());
+    } catch (URISyntaxException e) {
+      LOG.error("Error in URL syntax, please check your Ontology resource: ", e);
+    }
+    if (!ontArrayList.isEmpty()) {
+      load(ontArrayList.stream().toArray(String[]::new));
+    }
+  }
+
+  /**
+   * Load a string array of local URIs which refernece .owl files.
+   */
+  @Override
+  public void load(String[] urls) {
+    for (int i = 0; i < urls.length; i++) {
+      String url = urls[i].trim();
+      if (!"".equals(url))
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Reading and processing {}", url);
+        }
+      load(ontologyModel, url);
+    }
+    parser.parse(ontology, ontologyModel);
+  }
+
+  private void load(Object m, String url) {
+    try {
+      ((OntModel) m).read(url, null, null);
+      LOG.info("Successfully processed {}", url);
+    } catch (Exception e) {
+      LOG.error("Failed whilst attempting to read ontology {}: Error: ", url, e);
+    }
+  }
+
+  /**
+   * Get the {@link gov.nasa.jpl.mudrod.ontology.process.OntologyParser}
+   * implementation being used to process the input ontology resources.
+   * @return an {@link gov.nasa.jpl.mudrod.ontology.process.OntologyParser} implementation
+   */
+  public OntologyParser getParser() {
+    if (parser == null) {
+      parser = new OwlParser();
+    }
+    return parser;
+  }
+
+  /**
+   * Return the {@link org.apache.jena.ontology.OntModel} instance
+   * which created from input ontology resources.
+   * @return a constructed {@link org.apache.jena.ontology.OntModel}
+   */
+  public static OntModel getModel() {
+    return ontologyModel;
+  }
+
+  /**
+   * Return the loaded Ontology resources.
+   * @return a {@link java.util.List} of resources.
+   */
+  public List<String> getLoadedOntologyResources() {
+    if (ontArrayList != null) {
+      return ontArrayList;
+    } else {
+      return new ArrayList<>();
+    }
+  }
+  /**
+   * Not yet implemented.
+   */
+  @Override
+  public void merge(Ontology o) {
+    // not yet implemented
+  }
+
+  /**
+   * Retrieve all subclasses of entity(ies) hashed to searchTerm
+   * @param entitySearchTerm a query (keywords) for which to obtain
+   * subclasses.
+   * @return an {@link java.util.Iterator} containing the subclass as Strings.
+   */
+  @Override
+  public Iterator<String> subclasses(String entitySearchTerm) {
+    Map<OntResource, String> classMap = retrieve(entitySearchTerm);
+    Map<String, String> subclasses = new HashMap<>();
+
+    Iterator<OntResource> iter = classMap.keySet().iterator();
+    while (iter.hasNext()) {
+      OntResource resource = iter.next();
+
+      if (resource instanceof OntClass) {
+        //get subclasses N.B. we only get direct sub-classes e.g. direct children
+        //it is possible for us to navigate the entire class tree if we wish, we simply
+        //need to pass the .listSubClasses(true) boolean parameter.
+        for (Iterator<?> i = ((OntClass) resource).listSubClasses(); i.hasNext();) {
+          OntResource subclass = (OntResource) i.next();
+          for (Iterator<?> j = subclass.listLabels(null); j.hasNext();) {
+            Literal l = (Literal) j.next();
+            subclasses.put(l.toString(), "1");
+          }
+        }
+        //get individuals
+        for (Iterator<?> i = ((OntClass) resource).listInstances(); i.hasNext(); ) {
+          OntResource subclass = (OntResource) i.next();
+          for (Iterator<?> j = subclass.listLabels(null); j.hasNext();) {
+            Literal l = (Literal) j.next();
+            subclasses.put(l.toString(), "1");
+          }
+        }
+      } else if (resource instanceof Individual) {
+        for (Iterator<?> i = resource.listSameAs(); i.hasNext();) {
+          OntResource subclass = (OntResource) i.next();
+          for (Iterator<?> j = subclass.listLabels(null); j.hasNext();) {
+            Literal l = (Literal) j.next();
+            subclasses.put(l.toString(), "1");
+          }
+        }
+      }
+    }
+    return subclasses.keySet().iterator();
+  }
+
+  /**
+   * Retreives synonyms for an given phrase if the phrase
+   * is present in the ontology
+   * @param queryKeyPhrase an input string representing a phrase
+   * for which we wish to obtain synonyms.
+   * @return an {@link java.util.Iterator} containing synonyms string tokens
+   * or an empty if no synonyms exist for the given queryKeyPhrase.
+   */
+  @Override
+  public Iterator synonyms(String queryKeyPhrase) {
+
+    Map<?, ?> classMap = retrieve(queryKeyPhrase);
+
+    Map<Object, Object> synonyms = new HashMap<>();
+
+    Iterator<?> iter = classMap.keySet().iterator();
+    while (iter.hasNext()) {
+      OntResource resource = (OntResource) iter.next();
+
+      //listLabels
+      for (Iterator<?> i = resource.listLabels(null); i.hasNext();) {
+        Literal l = (Literal) i.next();
+        synonyms.put(l.toString(), "1");
+      }
+
+      if (resource instanceof Individual) {
+        //get all individuals same as this one
+        for (Iterator<?> i = resource.listSameAs(); i.hasNext();) {
+          Individual individual = (Individual) i.next();
+          //add labels
+          for (Iterator<?> j = individual.listLabels(null); j.hasNext();) {
+            Literal l = (Literal) i.next();
+            synonyms.put(l.toString(), "1");
+          }
+        }
+      } else if (resource instanceof OntClass) {
+        //list equivalent classes
+        for (Iterator<?> i = ((OntClass) resource).listEquivalentClasses(); i.hasNext();) {
+          OntClass equivClass = (OntClass) i.next();
+          //add labels
+          for (Iterator<?> j = equivClass.listLabels(null); j.hasNext();) {
+            Literal l = (Literal) j.next();
+            synonyms.put(l.toString(), "1");
+          }
+        }
+      }
+    }
+
+    return synonyms.keySet().iterator();
+  }
+
+  public void addSearchTerm(String label, OntResource resource) {
+    Map<OntResource, String> m = retrieve(label);
+    if (m == null) {
+      m = new HashMap<>();
+    }
+    m.put(resource, "1");
+    searchTerms.put(label.toLowerCase(), m);
+  }
+
+  /**
+   * A basic lookup function for retrieving keys (phrases or tokens)
+   * from the ontology search terms map. Right now only exact lookups
+   * will retrieve a result... this could be improved by using some
+   * advanced parsing logic... such as Lucene query parser.
+   * @param label the label (phrases or tokens) to retrieve from the 
+   * ontology search terms map.
+   * @return an {@link java.util.Map} if there are match(es)
+   * or an empty {@link java.util.HashMap} if there are no
+   * matches.
+   */
+  public Map<OntResource, String> retrieve(String label) {
+    @SuppressWarnings("unchecked")
+    Map<OntResource, String> m = (Map<OntResource, String>) searchTerms.get(label.toLowerCase());
+    if (m == null) {
+      m = new HashMap<>();
+    }
+    return m;
+  }
+
+  protected static void renderHierarchy(PrintStream out, OntClass cls, List<Object> occurs, int depth) {
+    renderClassDescription(out, cls, depth);
+    out.println();
+
+    // recurse to the next level down
+    if (cls.canAs(OntClass.class) && !occurs.contains(cls)) {
+      for (Iterator<?> i = cls.listSubClasses(true); i.hasNext(); ) {
+        OntClass sub = (OntClass) i.next();
+
+        // we push this expression on the occurs list before we recurse
+        occurs.add(cls);
+        renderHierarchy(out, sub, occurs, depth + 1);
+        occurs.remove(cls);
+      }
+      for (Iterator<?> i = cls.listInstances(); i.hasNext(); ) {
+        Individual individual = (Individual) i.next();
+        renderURI(out, individual.getModel(), individual.getURI());
+        out.print(" [");
+        for (Iterator<?> j = individual.listLabels(null); j.hasNext(); ) {
+          out.print(((Literal) j.next()).getString() + ", ");
+        }
+        out.print("] ");
+        out.println();
+      }
+    }
+  }
+
+  public static void renderClassDescription(PrintStream out, OntClass c, int depth) {
+    indent(out, depth);
+
+    if (c.isRestriction()) {
+      renderRestriction(out, (Restriction) c.as(Restriction.class));
+    } else {
+      if (!c.isAnon()) {
+        out.print("Class ");
+        renderURI(out, c.getModel(), c.getURI());
+
+        out.print(c.getLocalName());
+
+        out.print(" [");
+        for (Iterator<?> i = c.listLabels(null); i.hasNext(); ) {
+          out.print(((Literal) i.next()).getString() + ", ");
+        }
+        out.print("] ");
+      } else {
+        renderAnonymous(out, c, "class");
+      }
+    }
+  }
+
+  protected static void renderRestriction(PrintStream out, Restriction r) {
+    if (!r.isAnon()) {
+      out.print("Restriction ");
+      renderURI(out, r.getModel(), r.getURI());
+    } else {
+      renderAnonymous(out, r, "restriction");
+    }
+
+    out.print(" on property ");
+    renderURI(out, r.getModel(), r.getOnProperty().getURI());
+  }
+
+  protected static void renderURI(PrintStream out, PrefixMapping prefixes, String uri) {
+    out.print(prefixes.expandPrefix(uri));
+  }
+
+  protected static void renderAnonymous(PrintStream out, Resource anon, String name) {
+    String anonID = mAnonIDs.get(anon.getId());
+    if (anonID == null) {
+      anonID = "a-" + mAnonCount++;
+      mAnonIDs.put(anon.getId(), anonID);
+    }
+
+    out.print("Anonymous ");
+    out.print(name);
+    out.print(" with ID ");
+    out.print(anonID);
+  }
+
+  protected static void indent(PrintStream out, int depth) {
+    for (int i = 0; i < depth; i++) {
+      out.print(" ");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OntologyLinkCal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OntologyLinkCal.java b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OntologyLinkCal.java
new file mode 100644
index 0000000..a68a0cb
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OntologyLinkCal.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.ontology.process;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * Supports ability to parse and process FTP and HTTP log files
+ */
+public class OntologyLinkCal extends DiscoveryStepAbstract {
+
+  public OntologyLinkCal(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+    es.deleteAllByQuery(props.getProperty("indexName"), props.getProperty("ontologyLinkageType"), QueryBuilders.matchAllQuery());
+    addSWEETMapping();
+  }
+
+  /**
+   * Method of adding mapping for triples extracted from SWEET
+   */
+  public void addSWEETMapping() {
+    XContentBuilder Mapping;
+    try {
+      Mapping = jsonBuilder().startObject().startObject(props.getProperty("ontologyLinkageType")).startObject("properties").startObject("concept_A").field("type", "string")
+          .field("index", "not_analyzed").endObject().startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject()
+
+          .endObject().endObject().endObject();
+
+      es.getClient().admin().indices().preparePutMapping(props.getProperty("indexName")).setType(props.getProperty("ontologyLinkageType")).setSource(Mapping).execute().actionGet();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Method of calculating and importing SWEET triples into Elasticsearch
+   */
+  @Override
+  public Object execute() {
+    es.deleteType(props.getProperty("indexName"), props.getProperty("ontologyLinkageType"));
+    es.createBulkProcessor();
+
+    BufferedReader br = null;
+    String line = "";
+    double weight = 0;
+
+    try {
+      br = new BufferedReader(new FileReader(props.getProperty("oceanTriples")));
+      while ((line = br.readLine()) != null) {
+        String[] strList = line.toLowerCase().split(",");
+        if (strList[1].equals("subclassof")) {
+          weight = 0.75;
+        } else {
+          weight = 0.9;
+        }
+
+        IndexRequest ir = new IndexRequest(props.getProperty("indexName"), props.getProperty("ontologyLinkageType")).source(
+            jsonBuilder().startObject().field("concept_A", es.customAnalyzing(props.getProperty("indexName"), strList[2]))
+                .field("concept_B", es.customAnalyzing(props.getProperty("indexName"), strList[0])).field("weight", weight).endObject());
+        es.getBulkProcessor().add(ir);
+
+      }
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    } finally {
+      if (br != null) {
+        try {
+          br.close();
+          es.destroyBulkProcessor();
+          es.refreshIndex();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OntologyParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OntologyParser.java b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OntologyParser.java
new file mode 100644
index 0000000..eca6252
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OntologyParser.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.ontology.process;
+
+import org.apache.jena.ontology.OntClass;
+import org.apache.jena.ontology.OntModel;
+
+import gov.nasa.jpl.mudrod.ontology.Ontology;
+
+import java.util.Iterator;
+
+/**
+ * Interface for specific ontology parsers e.g. .ttl, RDFXML,
+ * etc.
+ */
+public interface OntologyParser {
+
+  /**
+   * An ontology model (RDF graph) to parse for literals.
+   *
+   * @param ont the associated {@link gov.nasa.jpl.mudrod.ontology.Ontology}
+   * implementation processing the ontology operation(s).
+   * @param ontModel the {@link org.apache.jena.ontology.OntModel}
+   */
+  public void parse(Ontology ont, OntModel ontModel);
+
+  /**
+   * An ontology model (RDF graph) for which to obtain an
+   * {@link java.util.Iterator} instance of all root classes.
+   *
+   * @param ontModel the {@link org.apache.jena.ontology.OntModel}
+   * @return an {@link java.util.Iterator} instance containing all root classes.
+   */
+  public Iterator<OntClass> rootClasses(OntModel ontModel);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OwlParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OwlParser.java b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OwlParser.java
new file mode 100644
index 0000000..e43f04d
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/OwlParser.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.ontology.process;
+
+import org.apache.jena.ontology.Individual;
+import org.apache.jena.ontology.OntClass;
+import org.apache.jena.ontology.OntModel;
+import org.apache.jena.rdf.model.Literal;
+
+import com.esotericsoftware.minlog.Log;
+
+import gov.nasa.jpl.mudrod.ontology.Ontology;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * {@link gov.nasa.jpl.mudrod.ontology.process.OntologyParser}
+ * implementation for <a href="http://www.w3.org/TR/owl-features/">W3C OWL</a> 
+ * files.
+ */
+public class OwlParser implements OntologyParser {
+  
+  private Ontology ont;
+  private List<OntClass> roots = new ArrayList<>();
+
+  public OwlParser() {
+    //default constructor
+  }
+
+  /**
+   * Parse OWL ontology files using Apache Jena
+   */
+  @Override
+  public void parse(Ontology ont, OntModel m) {
+    this.ont = ont;
+    for (Iterator<OntClass> i = rootClasses(m); i.hasNext(); ) {
+      OntClass c = i.next();
+
+      //dont deal with anonymous classes
+      if (c.isAnon()) {
+        continue;
+      }
+
+      parseClass(c, new ArrayList<>(), 0);
+    }
+  }
+
+  protected void parseClass(OntClass cls, List<Object> occurs, int depth) {
+    //dont deal with anonymous classes
+    if (cls.isAnon()) {
+      return;
+    }
+
+    //add cls to Ontology searchterms
+    //list labels
+    Iterator<?> labelIter = cls.listLabels(null);
+    //if has no labels
+    if (!labelIter.hasNext()) {
+      //add rdf:ID as a label
+      cls.addLabel(rdfidToLabel(cls.getLocalName()), null);
+    }
+    //reset the label iterator
+    labelIter = cls.listLabels(null);
+
+    while (labelIter.hasNext()) {
+      Literal l = (Literal) labelIter.next();
+      ((LocalOntology) ont).addSearchTerm(l.toString(), cls);
+    }
+
+    // recurse to the next level down
+    if (cls.canAs(OntClass.class) && !occurs.contains(cls)) {
+      //list subclasses
+      for (Iterator<?> i = cls.listSubClasses(true); i.hasNext(); ) {
+        OntClass sub = (OntClass) i.next();
+
+        // we push this expression on the occurs list before we recurse
+        occurs.add(cls);
+        parseClass(sub, occurs, depth + 1);
+        occurs.remove(cls);
+      }
+
+      //list instances
+      for (Iterator<?> i = cls.listInstances(); i.hasNext(); ) {
+        //add search terms for each instance
+
+        //list labels
+        Individual individual = (Individual) i.next();
+        for (Iterator<?> j = individual.listLabels(null); j.hasNext(); ) {
+          Literal l = (Literal) j.next();
+          ((LocalOntology) ont).addSearchTerm(l.toString(), individual);
+        }
+      }
+    }
+  }
+
+  /**
+   * Parses out all root classes of the given 
+   * {@link org.apache.jena.ontology.OntModel}
+   * @param m the {@link org.apache.jena.ontology.OntModel} we wish to obtain 
+   * all root classes for.
+   * @return an {@link java.util.Iterator} of {@link org.apache.jena.ontology.OntClass}
+   * elements representing all root classes.
+   */
+  @Override
+  public Iterator<OntClass> rootClasses(OntModel m) {
+    Iterator<?> i = m.listClasses();
+    if (i.hasNext() && i.next() instanceof OntClass) {
+      //assume ontology has root classes
+      processSingle(m);
+    } else {
+      //check for presence of aggregate/collection ontologies such as sweetAll.owl
+      processCollection(m);
+    }
+
+    return roots.iterator();
+  }
+
+  private void processSingle(OntModel m) {
+    for (Iterator<?> i = m.listClasses(); i.hasNext(); ) {
+      OntClass c = (OntClass) i.next();
+      try {
+        // too confusing to list all the restrictions as root classes 
+        if (c.isAnon()) {
+          continue;
+        }
+
+        if (c.hasSuperClass(m.getProfile().THING(), true) || c.getCardinality(m.getProfile().SUB_CLASS_OF()) == 0) {
+          // this class is directly descended from Thing
+          roots.add(c);
+        }
+      } catch (Exception e) {
+        Log.error("Error during extraction or root Classes from Ontology Model: ", e);
+      }
+    }
+  }
+
+  private void processCollection(OntModel m) {
+    for (Iterator<?> i = m.listSubModels(true); i.hasNext(); ) {
+      OntModel ontModel = (OntModel) i.next();
+      processSingle(ontModel);
+    }
+  }
+
+  public String rdfidToLabel(String idString) {
+    Pattern p = Pattern.compile("([a-z0-9])([A-Z])");
+    Matcher m = p.matcher(idString);
+
+    String labelString = idString;
+    while (m.find()) {
+      labelString = labelString.replaceAll(m.group(1) + m.group(2), m.group(1) + " " + m.group(2));
+    }
+    return labelString;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/package-info.java
new file mode 100644
index 0000000..3447426
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/ontology/process/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes ontology processing classes.
+ */
+package gov.nasa.jpl.mudrod.ontology.process;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/package-info.java
new file mode 100644
index 0000000..1e5d8bf
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes the preprocessing, processing, and data structure used
+ * by recommendation module.
+ */
+package gov.nasa.jpl.mudrod.recommendation;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/ImportMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/ImportMetadata.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/ImportMetadata.java
new file mode 100644
index 0000000..c174f31
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/ImportMetadata.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.recommendation.pre;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import gov.nasa.jpl.mudrod.metadata.pre.ApiHarvester;
+import org.apache.commons.io.IOUtils;
+import org.elasticsearch.action.index.IndexRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.Properties;
+
+/**
+ * ClassName: Import Metadata to elasticsearch
+ */
+
+public class ImportMetadata extends DiscoveryStepAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(ApiHarvester.class);
+
+  public ImportMetadata(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Starting Metadata Harvesting");
+    startTime = System.currentTimeMillis();
+    addMetadataMapping();
+    importToES();
+    endTime = System.currentTimeMillis();
+    es.refreshIndex();
+    LOG.info("Finished Metadata Harvesting time elapsed: {}s", (endTime - startTime) / 1000);
+    return null;
+  }
+
+  /**
+   * addMetadataMapping: Add mapping to index metadata in Elasticsearch. Please
+   * invoke this method before import metadata to Elasticsearch.
+   */
+  public void addMetadataMapping() {
+    String mappingJson = "{\r\n   \"dynamic_templates\": " + "[\r\n      " + "{\r\n         \"strings\": " + "{\r\n            \"match_mapping_type\": \"string\","
+        + "\r\n            \"mapping\": {\r\n               \"type\": \"string\"," + "\r\n               \"analyzer\": \"csv\"\r\n            }" + "\r\n         }\r\n      }\r\n   ]\r\n}";
+
+    es.getClient().admin().indices().preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType(props.getProperty("recom_metadataType")).setSource(mappingJson).execute().actionGet();
+
+  }
+
+  /**
+   * importToES: Index metadata into elasticsearch from local file directory.
+   * Please make sure metadata have been harvest from web service before
+   * invoking this method.
+   */
+  private void importToES() {
+    es.deleteType(props.getProperty("indexName"), props.getProperty("recom_metadataType"));
+
+    es.createBulkProcessor();
+    File directory = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH));
+    File[] fList = directory.listFiles();
+    for (File file : fList) {
+      InputStream is;
+      try {
+        is = new FileInputStream(file);
+        try {
+          String jsonTxt = IOUtils.toString(is);
+          JsonParser parser = new JsonParser();
+          JsonElement item = parser.parse(jsonTxt);
+          IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty("recom_metadataType")).source(item.toString());
+
+          // preprocessdata
+
+          es.getBulkProcessor().add(ir);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      } catch (FileNotFoundException e) {
+        e.printStackTrace();
+      }
+
+    }
+
+    es.destroyBulkProcessor();
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/MetadataTFIDFGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
new file mode 100644
index 0000000..02c74f0
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/MetadataTFIDFGenerator.java
@@ -0,0 +1,100 @@
+/**
+ * Project Name:mudrod-core
+ * File Name:TFIDFGenerator.java
+ * Package Name:gov.nasa.jpl.mudrod.recommendation.pre
+ * Date:Aug 22, 201612:39:52 PM
+ * Copyright (c) 2016, chenzhou1025@126.com All Rights Reserved.
+ */
+
+package gov.nasa.jpl.mudrod.recommendation.pre;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.recommendation.structure.MetadataOpt;
+import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix;
+import gov.nasa.jpl.mudrod.utils.MatrixUtil;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * ClassName: Generate TFIDF information of all metadata
+ */
+public class MetadataTFIDFGenerator extends DiscoveryStepAbstract {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(MetadataTFIDFGenerator.class);
+
+  /**
+   * Creates a new instance of MatrixGenerator.
+   *
+   * @param props the Mudrod configuration
+   * @param es    the Elasticsearch drive
+   * @param spark the spark drive
+   */
+  public MetadataTFIDFGenerator(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+
+    LOG.info("Starting Dataset TF_IDF Matrix Generator");
+    startTime = System.currentTimeMillis();
+    try {
+      generateWordBasedTFIDF();
+    } catch (Exception e) {
+      LOG.error("Error during Dataset TF_IDF Matrix Generation: {}", e);
+    }
+    endTime = System.currentTimeMillis();
+
+    LOG.info("Dataset TF_IDF Matrix Generation complete, time elaspsed: {}s", (endTime - startTime) / 1000);
+
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  public LabeledRowMatrix generateWordBasedTFIDF() throws Exception {
+
+    MetadataOpt opt = new MetadataOpt(props);
+
+    JavaPairRDD<String, String> metadataContents = opt.loadAll(es, spark);
+
+    JavaPairRDD<String, List<String>> metadataWords = opt.tokenizeData(metadataContents, " ");
+
+    LabeledRowMatrix wordtfidfMatrix = opt.tFIDFTokens(metadataWords, spark);
+
+    MatrixUtil.exportToCSV(wordtfidfMatrix.rowMatrix, wordtfidfMatrix.rowkeys, wordtfidfMatrix.colkeys, props.getProperty("metadata_word_tfidf_matrix"));
+
+    return wordtfidfMatrix;
+  }
+
+  public LabeledRowMatrix generateTermBasedTFIDF() throws Exception {
+
+    MetadataOpt opt = new MetadataOpt(props);
+
+    List<String> variables = new ArrayList<>();
+    variables.add("DatasetParameter-Term");
+    variables.add("DatasetParameter-Variable");
+    variables.add("Dataset-ExtractTerm");
+
+    JavaPairRDD<String, String> metadataContents = opt.loadAll(es, spark, variables);
+
+    JavaPairRDD<String, List<String>> metadataTokens = opt.tokenizeData(metadataContents, ",");
+
+    LabeledRowMatrix tokentfidfMatrix = opt.tFIDFTokens(metadataTokens, spark);
+
+    MatrixUtil.exportToCSV(tokentfidfMatrix.rowMatrix, tokentfidfMatrix.rowkeys, tokentfidfMatrix.colkeys, props.getProperty("metadata_term_tfidf_matrix"));
+
+    return tokentfidfMatrix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/NormalizeVariables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/NormalizeVariables.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/NormalizeVariables.java
new file mode 100644
index 0000000..f5eaa9c
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/NormalizeVariables.java
@@ -0,0 +1,223 @@
+package gov.nasa.jpl.mudrod.recommendation.pre;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+public class NormalizeVariables extends DiscoveryStepAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(NormalizeVariables.class);
+  // index name
+  private String indexName;
+  // type name of metadata in ES
+  private String metadataType;
+
+  /**
+   * Creates a new instance of OHEncoder.
+   *
+   * @param props the Mudrod configuration
+   * @param es    an instantiated {@link ESDriver}
+   * @param spark an instantiated {@link SparkDriver}
+   */
+  public NormalizeVariables(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+    indexName = props.getProperty("indexName");
+    metadataType = props.getProperty("recom_metadataType");
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("*****************processing metadata variables starts******************");
+    startTime = System.currentTimeMillis();
+
+    normalizeMetadataVariables(es);
+
+    endTime = System.currentTimeMillis();
+    LOG.info("*****************processing metadata variables ends******************Took {}s", (endTime - startTime) / 1000);
+
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  public void normalizeMetadataVariables(ESDriver es) {
+
+    es.createBulkProcessor();
+
+    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+        .actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> metadata = hit.getSource();
+        Map<String, Object> updatedValues = new HashMap<>();
+
+        this.normalizeSpatialVariables(metadata, updatedValues);
+        this.normalizeTemporalVariables(metadata, updatedValues);
+        this.normalizeOtherVariables(metadata, updatedValues);
+
+        UpdateRequest ur = es.generateUpdateRequest(indexName, metadataType, hit.getId(), updatedValues);
+        es.getBulkProcessor().add(ur);
+      }
+
+      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    es.destroyBulkProcessor();
+  }
+
+  private void normalizeOtherVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
+    String shortname = (String) metadata.get("Dataset-ShortName");
+    double versionNUm = getVersionNum(shortname);
+    updatedValues.put("Dataset-Derivative-VersionNum", versionNUm);
+
+  }
+
+  private Double getVersionNum(String version) {
+    if (version == null) {
+      return 0.0;
+    }
+    Double versionNum = 0.0;
+    Pattern p = Pattern.compile(".*[a-zA-Z].*");
+    if ("Operational/Near-Real-Time".equals(version)) {
+      versionNum = 2.0;
+    } else if (version.matches("[0-9]{1}[a-zA-Z]{1}")) {
+      versionNum = Double.parseDouble(version.substring(0, 1));
+    } else if (p.matcher(version).find()) {
+      versionNum = 0.0;
+    } else {
+      versionNum = Double.parseDouble(version);
+      if (versionNum >= 5) {
+        versionNum = 20.0;
+      }
+    }
+    return versionNum;
+  }
+
+  private void normalizeSpatialVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
+
+    // get spatial resolution
+    Double spatialR;
+    if (metadata.get("Dataset-SatelliteSpatialResolution") != null) {
+      spatialR = (Double) metadata.get("Dataset-SatelliteSpatialResolution");
+    } else {
+      Double gridR = (Double) metadata.get("Dataset-GridSpatialResolution");
+      if (gridR != null) {
+        spatialR = 111 * gridR;
+      } else {
+        spatialR = 25.0;
+      }
+    }
+    updatedValues.put("Dataset-Derivative-SpatialResolution", spatialR);
+
+    // Transform Longitude and calculate coverage area
+    double top = parseDouble((String) metadata.get("DatasetCoverage-NorthLat"));
+    double bottom = parseDouble((String) metadata.get("DatasetCoverage-SouthLat"));
+    double left = parseDouble((String) metadata.get("DatasetCoverage-WestLon"));
+    double right = parseDouble((String) metadata.get("DatasetCoverage-EastLon"));
+
+    if (left > 180) {
+      left = left - 360;
+    }
+
+    if (right > 180) {
+      right = right - 360;
+    }
+
+    if (left == right) {
+      left = -180;
+      right = 180;
+    }
+
+    double area = (top - bottom) * (right - left);
+
+    updatedValues.put("DatasetCoverage-Derivative-EastLon", right);
+    updatedValues.put("DatasetCoverage-Derivative-WestLon", left);
+    updatedValues.put("DatasetCoverage-Derivative-NorthLat", top);
+    updatedValues.put("DatasetCoverage-Derivative-SouthLat", bottom);
+    updatedValues.put("DatasetCoverage-Derivative-Area", area);
+
+    // get processing level
+    String processingLevel = (String) metadata.get("Dataset-ProcessingLevel");
+    double dProLevel = this.getProLevelNum(processingLevel);
+    updatedValues.put("Dataset-Derivative-ProcessingLevel", dProLevel);
+  }
+
+  private void normalizeTemporalVariables(Map<String, Object> metadata, Map<String, Object> updatedValues) {
+
+    String trStr = (String) metadata.get("Dataset-TemporalResolution");
+    if ("".equals(trStr)) {
+      trStr = (String) metadata.get("Dataset-TemporalRepeat");
+    }
+
+    updatedValues.put("Dataset-Derivative-TemporalResolution", covertTimeUnit(trStr));
+  }
+
+  private Double covertTimeUnit(String str) {
+    Double timeInHour;
+    if (str.contains("Hour")) {
+      timeInHour = Double.parseDouble(str.split(" ")[0]);
+    } else if (str.contains("Day")) {
+      timeInHour = Double.parseDouble(str.split(" ")[0]) * 24;
+    } else if (str.contains("Week")) {
+      timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7;
+    } else if (str.contains("Month")) {
+      timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7 * 30;
+    } else if (str.contains("Year")) {
+      timeInHour = Double.parseDouble(str.split(" ")[0]) * 24 * 7 * 30 * 365;
+    } else {
+      timeInHour = 0.0;
+    }
+
+    return timeInHour;
+  }
+
+  public Double getProLevelNum(String pro) {
+    if (pro == null) {
+      return 1.0;
+    }
+    Double proNum = 0.0;
+    Pattern p = Pattern.compile(".*[a-zA-Z].*");
+    if (pro.matches("[0-9]{1}[a-zA-Z]{1}")) {
+      proNum = Double.parseDouble(pro.substring(0, 1));
+    } else if (p.matcher(pro).find()) {
+      proNum = 1.0;
+    } else {
+      proNum = Double.parseDouble(pro);
+    }
+
+    return proNum;
+  }
+
+  private double parseDouble(String strNumber) {
+    if (strNumber != null && strNumber.length() > 0) {
+      try {
+        return Double.parseDouble(strNumber);
+      } catch (Exception e) {
+        return -1;
+      }
+    } else
+      return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/SessionCooccurence.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/SessionCooccurence.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/SessionCooccurence.java
new file mode 100644
index 0000000..2aecce3
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/SessionCooccurence.java
@@ -0,0 +1,152 @@
+/**
+ * Project Name:mudrod-core
+ * File Name:SessionCooccurenceMatrix.java
+ * Package Name:gov.nasa.jpl.mudrod.recommendation.pre
+ * Date:Aug 19, 20163:06:33 PM
+ * Copyright (c) 2016, chenzhou1025@126.com All Rights Reserved.
+ */
+
+package gov.nasa.jpl.mudrod.recommendation.pre;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix;
+import gov.nasa.jpl.mudrod.utils.MatrixUtil;
+import gov.nasa.jpl.mudrod.weblog.structure.SessionExtractor;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.*;
+
+/**
+ * ClassName: SessionCooccurenceMatrix Function: Generate metadata session
+ * coocucurence matrix from web logs. Each row in the matrix is corresponding to
+ * a metadata, and each column is a session.
+ */
+public class SessionCooccurence extends DiscoveryStepAbstract {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(SessionCooccurence.class);
+
+  /**
+   * Creates a new instance of SessionCooccurence.
+   *
+   * @param props
+   *          the Mudrod configuration
+   * @param es
+   *          the Elasticsearch drive
+   * @param spark
+   *          the spark driver
+   */
+  public SessionCooccurence(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+
+    LOG.info("Starting dataset session-based similarity generation...");
+
+    startTime = System.currentTimeMillis();
+
+    // get all metadata session cooccurance data
+    SessionExtractor extractor = new SessionExtractor();
+    JavaPairRDD<String, List<String>> sessionDatasetRDD = extractor.bulidSessionDatasetRDD(props, es, spark);
+
+    // remove retired datasets
+    JavaPairRDD<String, List<String>> sessionFiltedDatasetsRDD = removeRetiredDataset(es, sessionDatasetRDD);
+    LabeledRowMatrix datasetSessionMatrix = MatrixUtil.createWordDocMatrix(sessionFiltedDatasetsRDD);
+
+    // export
+    MatrixUtil.exportToCSV(datasetSessionMatrix.rowMatrix, datasetSessionMatrix.rowkeys, datasetSessionMatrix.colkeys, props.getProperty("session_metadata_Matrix"));
+
+    endTime = System.currentTimeMillis();
+
+    LOG.info("Completed dataset session-based  similarity generation. Time elapsed: {}s", (endTime - startTime) / 1000);
+
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  /**
+   * filter out-of-data metadata
+   *
+   * @param es
+   *          the Elasticsearch drive
+   * @param userDatasetsRDD
+   *          dataset extracted from session
+   * @return filtered session datasets
+   */
+  public JavaPairRDD<String, List<String>> removeRetiredDataset(ESDriver es, JavaPairRDD<String, List<String>> userDatasetsRDD) {
+
+    Map<String, String> nameMap = this.getOnServiceMetadata(es);
+
+    return userDatasetsRDD.mapToPair(new PairFunction<Tuple2<String, List<String>>, String, List<String>>() {
+      /**
+       * 
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, List<String>> call(Tuple2<String, List<String>> arg0) throws Exception {
+        List<String> oriDatasets = arg0._2;
+        List<String> newDatasets = new ArrayList<>();
+        int size = oriDatasets.size();
+        for (int i = 0; i < size; i++) {
+          String name = oriDatasets.get(i);
+          if (nameMap.containsKey(name)) {
+            newDatasets.add(nameMap.get(name));
+          }
+        }
+        return new Tuple2<>(arg0._1, newDatasets);
+      }
+    });
+
+  }
+
+  /**
+   * getMetadataNameMap: Get on service metadata names, key is lowcase of short
+   * name and value is the original short name
+   *
+   * @param es
+   *          the elasticsearch client
+   * @return a map from lower case metadata name to original metadata name
+   */
+  private Map<String, String> getOnServiceMetadata(ESDriver es) {
+
+    String indexName = props.getProperty(MudrodConstants.ES_INDEX_NAME);
+    String metadataType = props.getProperty("recom_metadataType");
+
+    Map<String, String> shortnameMap = new HashMap<>();
+    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+        .actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> metadata = hit.getSource();
+        String shortName = (String) metadata.get("Dataset-ShortName");
+        shortnameMap.put(shortName.toLowerCase(), shortName);
+      }
+
+      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    return shortnameMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/package-info.java
new file mode 100644
index 0000000..2febf96
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/pre/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes the preprocessing required by recommendation module.
+ */
+package gov.nasa.jpl.mudrod.recommendation.pre;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/AbstractBasedSimilarity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/AbstractBasedSimilarity.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/AbstractBasedSimilarity.java
new file mode 100644
index 0000000..b0e93fc
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/AbstractBasedSimilarity.java
@@ -0,0 +1,74 @@
+/**
+ * Project Name:mudrod-core
+ * File Name:TopicBasedCF.java
+ * Package Name:gov.nasa.jpl.mudrod.recommendation.process
+ * Date:Aug 22, 201610:45:55 AM
+ * Copyright (c) 2016, chenzhou1025@126.com All Rights Reserved.
+ */
+
+package gov.nasa.jpl.mudrod.recommendation.process;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.semantics.SVDAnalyzer;
+import gov.nasa.jpl.mudrod.utils.LinkageTriple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * ClassName: Recommend metedata based on data content semantic similarity
+ */
+public class AbstractBasedSimilarity extends DiscoveryStepAbstract {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractBasedSimilarity.class);
+
+  /**
+   * Creates a new instance of TopicBasedCF.
+   *
+   * @param props the Mudrod configuration
+   * @param es    the Elasticsearch client
+   * @param spark the spark drive
+   */
+  public AbstractBasedSimilarity(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+
+    LOG.info("*****************abstract similarity calculation starts******************");
+    startTime = System.currentTimeMillis();
+
+    try {
+      /*String topicMatrixFile = props.getProperty("metadata_term_tfidf_matrix");
+      SemanticAnalyzer analyzer = new SemanticAnalyzer(props, es, spark);
+      List<LinkageTriple> triples = analyzer
+          .calTermSimfromMatrix(topicMatrixFile);
+      analyzer.saveToES(triples, props.getProperty("indexName"),
+          props.getProperty("metadataTermTFIDFSimType"), true, true);*/
+
+      // for comparison
+      SVDAnalyzer svd = new SVDAnalyzer(props, es, spark);
+      svd.getSVDMatrix(props.getProperty("metadata_word_tfidf_matrix"), 150, props.getProperty("metadata_word_tfidf_matrix"));
+      List<LinkageTriple> tripleList = svd.calTermSimfromMatrix(props.getProperty("metadata_word_tfidf_matrix"));
+      svd.saveToES(tripleList, props.getProperty("indexName"), props.getProperty("metadataWordTFIDFSimType"), true, true);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    endTime = System.currentTimeMillis();
+    LOG.info("*****************abstract similarity calculation ends******************Took {}s", (endTime - startTime) / 1000);
+
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/VariableBasedSimilarity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/VariableBasedSimilarity.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/VariableBasedSimilarity.java
new file mode 100644
index 0000000..67aeeb8
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/VariableBasedSimilarity.java
@@ -0,0 +1,380 @@
+package gov.nasa.jpl.mudrod.recommendation.process;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+public class VariableBasedSimilarity extends DiscoveryStepAbstract implements Serializable {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(VariableBasedSimilarity.class);
+
+  private DecimalFormat df = new DecimalFormat("#.000");
+  // a map from variable to its type
+  public Map<String, Integer> variableTypes;
+  public Map<String, Integer> variableWeights;
+
+  private static final Integer VAR_SPATIAL = 1;
+  private static final Integer VAR_TEMPORAL = 2;
+  private static final Integer VAR_CATEGORICAL = 3;
+  private static final Integer VAR_ORDINAL = 4;
+
+  // index name
+  private String indexName;
+  // type name of metadata in ES
+  private String metadataType;
+  private String variableSimType;
+
+  /**
+   * Creates a new instance of OHEncoder.
+   *
+   * @param props the Mudrod configuration
+   * @param es    an instantiated {@link ESDriver}
+   * @param spark an instantiated {@link SparkDriver}
+   */
+  public VariableBasedSimilarity(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+
+    indexName = props.getProperty("indexName");
+    metadataType = props.getProperty("recom_metadataType");
+    variableSimType = props.getProperty("metadataCodeSimType");
+    this.inital();
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("*****************calculating metadata variables based similarity starts******************");
+    startTime = System.currentTimeMillis();
+    es.deleteType(indexName, variableSimType);
+    addMapping(es, indexName, variableSimType);
+
+    VariableBasedSimilarity(es);
+    es.refreshIndex();
+    normalizeVariableWeight(es);
+    es.refreshIndex();
+    endTime = System.currentTimeMillis();
+    LOG.info("*****************calculating metadata variables based similarity ends******************Took {}s", (endTime - startTime) / 1000);
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  public void inital() {
+    this.initVariableType();
+    this.initVariableWeight();
+  }
+
+  private void initVariableType() {
+    variableTypes = new HashMap<>();
+
+    variableTypes.put("DatasetParameter-Variable", VAR_CATEGORICAL);
+    variableTypes.put("DatasetRegion-Region", VAR_CATEGORICAL);
+    variableTypes.put("Dataset-ProjectionType", VAR_CATEGORICAL);
+    variableTypes.put("Dataset-ProcessingLevel", VAR_CATEGORICAL);
+    variableTypes.put("DatasetParameter-Topic", VAR_CATEGORICAL);
+    variableTypes.put("DatasetParameter-Term", VAR_CATEGORICAL);
+    variableTypes.put("DatasetParameter-Category", VAR_CATEGORICAL);
+    variableTypes.put("DatasetPolicy-DataFormat", VAR_CATEGORICAL);
+    variableTypes.put("Collection-ShortName", VAR_CATEGORICAL);
+    variableTypes.put("DatasetSource-Source-Type", VAR_CATEGORICAL);
+    variableTypes.put("DatasetSource-Source-ShortName", VAR_CATEGORICAL);
+    variableTypes.put("DatasetSource-Sensor-ShortName", VAR_CATEGORICAL);
+    variableTypes.put("DatasetPolicy-Availability", VAR_CATEGORICAL);
+    variableTypes.put("Dataset-Provider-ShortName", VAR_CATEGORICAL);
+
+    variableTypes.put("Dataset-Derivative-ProcessingLevel", VAR_ORDINAL);
+    variableTypes.put("Dataset-Derivative-TemporalResolution", VAR_ORDINAL);
+    variableTypes.put("Dataset-Derivative-SpatialResolution", VAR_ORDINAL);
+  }
+
+  private void initVariableWeight() {
+    variableWeights = new HashMap<>();
+
+    variableWeights.put("Dataset-Derivative-ProcessingLevel", 5);
+    variableWeights.put("DatasetParameter-Category", 5);
+    variableWeights.put("DatasetParameter-Variable", 5);
+    variableWeights.put("DatasetSource-Sensor-ShortName", 5);
+
+    variableWeights.put("DatasetPolicy-Availability", 4);
+    variableWeights.put("DatasetRegion-Region", 4);
+    variableWeights.put("DatasetSource-Source-Type", 4);
+    variableWeights.put("DatasetSource-Source-ShortName", 4);
+    variableWeights.put("DatasetParameter-Term", 4);
+    variableWeights.put("DatasetPolicy-DataFormat", 4);
+    variableWeights.put("Dataset-Derivative-SpatialResolution", 4);
+    variableWeights.put("Temporal_Covergae", 4);
+
+    variableWeights.put("DatasetParameter-Topic", 3);
+    variableWeights.put("Collection-ShortName", 3);
+    variableWeights.put("Dataset-Derivative-TemporalResolution", 3);
+    variableWeights.put("Spatial_Covergae", 3);
+
+    variableWeights.put("Dataset-ProjectionType", 1);
+    variableWeights.put("Dataset-Provider-ShortName", 1);
+  }
+
+  public void VariableBasedSimilarity(ESDriver es) {
+
+    es.createBulkProcessor();
+
+    List<Map<String, Object>> metadatas = new ArrayList<>();
+    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(metadataType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+        .actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> metadataA = hit.getSource();
+        metadatas.add(metadataA);
+      }
+
+      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    int size = metadatas.size();
+    for (int i = 0; i < size; i++) {
+      Map<String, Object> metadataA = metadatas.get(i);
+      String shortNameA = (String) metadataA.get("Dataset-ShortName");
+
+      for (int j = 0; j < size; j++) {
+        metadataA = metadatas.get(i);
+        Map<String, Object> metadataB = metadatas.get(j);
+        String shortNameB = (String) metadataB.get("Dataset-ShortName");
+
+        try {
+          XContentBuilder contentBuilder = jsonBuilder().startObject();
+          contentBuilder.field("concept_A", shortNameA);
+          contentBuilder.field("concept_B", shortNameB);
+
+          // spatial similarity
+          this.spatialSimilarity(metadataA, metadataB, contentBuilder);
+          // temporal similarity
+          this.temporalSimilarity(metadataA, metadataB, contentBuilder);
+          // categorical variables similarity
+          this.categoricalVariablesSimilarity(metadataA, metadataB, contentBuilder);
+          // ordinal variables similarity
+          this.ordinalVariablesSimilarity(metadataA, metadataB, contentBuilder);
+
+          contentBuilder.endObject();
+
+          IndexRequest ir = new IndexRequest(indexName, variableSimType).source(contentBuilder);
+          es.getBulkProcessor().add(ir);
+
+        } catch (IOException e1) {
+          e1.printStackTrace();
+        }
+
+      }
+    }
+
+    es.destroyBulkProcessor();
+  }
+
+  /*
+   * refer to P. Frontiera, R. Larson, and J. Radke (2008) A comparison of
+     geometric approaches to assessing spatial similarity for GIR.
+     International Journal of Geographical Information Science,
+     22(3)
+   */
+  public void spatialSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException {
+
+    double topA = (double) metadataA.get("DatasetCoverage-Derivative-NorthLat");
+    double bottomA = (double) metadataA.get("DatasetCoverage-Derivative-SouthLat");
+    double leftA = (double) metadataA.get("DatasetCoverage-Derivative-WestLon");
+    double rightA = (double) metadataA.get("DatasetCoverage-Derivative-EastLon");
+    double areaA = (double) metadataA.get("DatasetCoverage-Derivative-Area");
+
+    double topB = (double) metadataB.get("DatasetCoverage-Derivative-NorthLat");
+    double bottomB = (double) metadataB.get("DatasetCoverage-Derivative-SouthLat");
+    double leftB = (double) metadataB.get("DatasetCoverage-Derivative-WestLon");
+    double rightB = (double) metadataB.get("DatasetCoverage-Derivative-EastLon");
+    double areaB = (double) metadataB.get("DatasetCoverage-Derivative-Area");
+
+    // Intersect area
+    double xOverlap = Math.max(0, Math.min(rightA, rightB) - Math.max(leftA, leftB));
+    double yOverlap = Math.max(0, Math.min(topA, topB) - Math.max(bottomA, bottomB));
+    double overlapArea = xOverlap * yOverlap;
+
+    // Calculate coverage similarity
+    double similarity = 0.0;
+    if (areaA > 0 && areaB > 0) {
+      similarity = (overlapArea / areaA + overlapArea / areaB) * 0.5;
+    }
+
+    contentBuilder.field("Spatial_Covergae_Sim", similarity);
+  }
+
+  public void temporalSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException {
+
+    double similarity = 0.0;
+    double startTimeA = Double.parseDouble((String) metadataA.get("Dataset-DatasetCoverage-StartTimeLong"));
+    String endTimeAStr = (String) metadataA.get("Dataset-DatasetCoverage-StopTimeLong");
+    double endTimeA = 0.0;
+    if ("".equals(endTimeAStr)) {
+      endTimeA = System.currentTimeMillis();
+    } else {
+      endTimeA = Double.parseDouble(endTimeAStr);
+    }
+    double timespanA = endTimeA - startTimeA;
+
+    double startTimeB = Double.parseDouble((String) metadataB.get("Dataset-DatasetCoverage-StartTimeLong"));
+    String endTimeBStr = (String) metadataB.get("Dataset-DatasetCoverage-StopTimeLong");
+    double endTimeB = 0.0;
+    if ("".equals(endTimeBStr)) {
+      endTimeB = System.currentTimeMillis();
+    } else {
+      endTimeB = Double.parseDouble(endTimeBStr);
+    }
+    double timespanB = endTimeB - startTimeB;
+
+    double intersect = 0.0;
+    if (startTimeB >= endTimeA || endTimeB <= startTimeA) {
+      intersect = 0.0;
+    } else if (startTimeB >= startTimeA && endTimeB <= endTimeA) {
+      intersect = timespanB;
+    } else if (startTimeA >= startTimeB && endTimeA <= endTimeB) {
+      intersect = timespanA;
+    } else {
+      intersect = (startTimeA > startTimeB) ? (endTimeB - startTimeA) : (endTimeA - startTimeB);
+    }
+
+    similarity = intersect / (Math.sqrt(timespanA) * Math.sqrt(timespanB));
+    contentBuilder.field("Temporal_Covergae_Sim", similarity);
+  }
+
+  public void categoricalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException {
+
+    for (String variable : variableTypes.keySet()) {
+      Integer type = variableTypes.get(variable);
+      if (type != VAR_CATEGORICAL) {
+        continue;
+      }
+
+      double similarity = 0.0;
+      Object valueA = metadataA.get(variable);
+      Object valueB = metadataB.get(variable);
+      if (valueA instanceof ArrayList) {
+        ArrayList<String> aList = (ArrayList<String>) valueA;
+        ArrayList<String> bList = (ArrayList<String>) valueB;
+        if (aList != null && bList != null) {
+
+          int lengthA = aList.size();
+          int lengthB = bList.size();
+          List<String> newAList = new ArrayList<>(aList);
+          List<String> newBList = new ArrayList<>(bList);
+          newAList.retainAll(newBList);
+          similarity = newAList.size() / lengthA;
+        }
+
+      } else if (valueA instanceof String) {
+        if (valueA.equals(valueB)) {
+          similarity = 1.0;
+        }
+      }
+
+      contentBuilder.field(variable + "_Sim", similarity);
+    }
+  }
+
+  public void ordinalVariablesSimilarity(Map<String, Object> metadataA, Map<String, Object> metadataB, XContentBuilder contentBuilder) throws IOException {
+    for (String variable : variableTypes.keySet()) {
+      Integer type = variableTypes.get(variable);
+      if (type != VAR_ORDINAL) {
+        continue;
+      }
+
+      double similarity = 0.0;
+      Object valueA = metadataA.get(variable);
+      Object valueB = metadataB.get(variable);
+      if (valueA != null && valueB != null) {
+
+        double a = (double) valueA;
+        double b = (double) valueB;
+        if (a != 0.0) {
+          similarity = 1 - Math.abs(b - a) / a;
+          if (similarity < 0) {
+            similarity = 0.0;
+          }
+        }
+      }
+
+      contentBuilder.field(variable + "_Sim", similarity);
+    }
+  }
+
+  public static void addMapping(ESDriver es, String index, String type) {
+    XContentBuilder Mapping;
+    try {
+      Mapping = jsonBuilder().startObject().startObject(type).startObject("properties").startObject("concept_A").field("type", "string").field("index", "not_analyzed").endObject()
+          .startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject()
+
+          .endObject().endObject().endObject();
+
+      es.getClient().admin().indices().preparePutMapping(index).setType(type).setSource(Mapping).execute().actionGet();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void normalizeVariableWeight(ESDriver es) {
+
+    es.createBulkProcessor();
+
+    double totalWeight = 0.0;
+    for (String variable : variableWeights.keySet()) {
+      totalWeight += variableWeights.get(variable);
+    }
+
+    SearchResponse scrollResp = es.getClient().prepareSearch(indexName).setTypes(variableSimType).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.matchAllQuery()).setSize(100).execute()
+        .actionGet();
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        Map<String, Object> similarities = hit.getSource();
+
+        double totalSim = 0.0;
+        for (String variable : variableWeights.keySet()) {
+          if (similarities.containsKey(variable + "_Sim")) {
+            double value = (double) similarities.get(variable + "_Sim");
+            double weight = variableWeights.get(variable);
+            totalSim += weight * value;
+          }
+        }
+
+        double weight = totalSim / totalWeight;
+        UpdateRequest ur = es.generateUpdateRequest(indexName, variableSimType, hit.getId(), "weight", weight);
+        es.getBulkProcessor().add(ur);
+      }
+
+      scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+    }
+
+    es.destroyBulkProcessor();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/package-info.java
new file mode 100644
index 0000000..84231f7
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes the processing required by recommendation module.
+ */
+package gov.nasa.jpl.mudrod.recommendation.process;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/sessionBasedCF.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/sessionBasedCF.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/sessionBasedCF.java
new file mode 100644
index 0000000..ae55769
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/process/sessionBasedCF.java
@@ -0,0 +1,74 @@
+/**
+ * Project Name:mudrod-core
+ * File Name:sessionBasedCF.java
+ * Package Name:gov.nasa.jpl.mudrod.recommendation.process
+ * Date:Aug 19, 20163:17:00 PM
+ * Copyright (c) 2016, chenzhou1025@126.com All Rights Reserved.
+ */
+
+package gov.nasa.jpl.mudrod.recommendation.process;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.semantics.SemanticAnalyzer;
+import gov.nasa.jpl.mudrod.utils.LinkageTriple;
+import gov.nasa.jpl.mudrod.utils.SimilarityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * ClassName: Recommend metedata based on session level co-occurrence
+ */
+public class sessionBasedCF extends DiscoveryStepAbstract {
+
+  private static final Logger LOG = LoggerFactory.getLogger(sessionBasedCF.class);
+
+  /**
+   * Creates a new instance of sessionBasedCF.
+   *
+   * @param props
+   *          the Mudrod configuration
+   * @param es
+   *          the Elasticsearch drive
+   * @param spark
+   *          the spark drive
+   */
+  public sessionBasedCF(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("*****************Session based metadata similarity starts******************");
+    startTime = System.currentTimeMillis();
+
+    try {
+      String session_metadatFile = props.getProperty("session_metadata_Matrix");
+      File f = new File(session_metadatFile);
+      if (f.exists()) {
+        SemanticAnalyzer analyzer = new SemanticAnalyzer(props, es, spark);
+        List<LinkageTriple> triples = analyzer.calTermSimfromMatrix(session_metadatFile, SimilarityUtil.SIM_PEARSON, 1);
+        analyzer.saveToES(triples, props.getProperty("indexName"), props.getProperty("metadataSessionBasedSimType"), true, false);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    endTime = System.currentTimeMillis();
+    LOG.info("*****************Session based metadata similarity ends******************Took {}s", (endTime - startTime) / 1000);
+
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/HybridRecommendation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/HybridRecommendation.java b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/HybridRecommendation.java
new file mode 100644
index 0000000..4163fda
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/recommendation/structure/HybridRecommendation.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.recommendation.structure;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodEngine;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.*;
+
+/**
+ * Recommend metadata using combination all two methods, including content-based
+ * similarity and session-level similarity
+ */
+public class HybridRecommendation extends DiscoveryStepAbstract {
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  // recommended metadata list
+  protected transient List<LinkedTerm> termList = new ArrayList<>();
+  // format decimal
+  DecimalFormat df = new DecimalFormat("#.00");
+  // index name
+  protected static final String INDEX_NAME = "indexName";
+  private static final String WEIGHT = "weight";
+
+  /**
+   * recommended data class Date: Sep 12, 2016 2:25:28 AM
+   */
+  class LinkedTerm {
+    public String term = null;
+    public double weight = 0;
+    public String model = null;
+
+    public LinkedTerm(String str, double w, String m) {
+      term = str;
+      weight = w;
+      model = m;
+    }
+  }
+
+  public HybridRecommendation(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  /**
+   * Get recommended data for a giving dataset
+   *
+   * @param input: a giving dataset
+   * @param num:   the number of recommended dataset
+   * @return recommended dataset in json format
+   */
+  public JsonObject getRecomDataInJson(String input, int num) {
+    JsonObject resultJson = new JsonObject();
+
+    String type = props.getProperty("metadataCodeSimType");
+    Map<String, Double> sortedVariableSimMap = getRelatedData(type, input, num + 10);
+
+    type = props.getProperty("metadataWordTFIDFSimType");
+    Map<String, Double> sortedAbstractSimMap = getRelatedData(type, input, num + 10);
+
+    type = props.getProperty("metadataSessionBasedSimType");
+    Map<String, Double> sortedSessionSimMap = getRelatedData(type, input, num + 10);
+
+    JsonElement variableSimJson = mapToJson(sortedVariableSimMap, num);
+    resultJson.add("variableSim", variableSimJson);
+    JsonElement abstractSimJson = mapToJson(sortedAbstractSimMap, num);
+    resultJson.add("abstractSim", abstractSimJson);
+    JsonElement sessionSimJson = mapToJson(sortedSessionSimMap, num);
+    resultJson.add("sessionSim", sessionSimJson);
+
+    Map<String, Double> hybirdSimMap = new HashMap<String, Double>();
+
+    for (String name : sortedAbstractSimMap.keySet()) {
+      hybirdSimMap.put(name, sortedAbstractSimMap.get(name) /** 0.4 */);
+    }
+
+    for (String name : sortedVariableSimMap.keySet()) {
+      if (hybirdSimMap.get(name) != null) {
+        double sim = hybirdSimMap.get(name) + sortedVariableSimMap.get(name) /** 0.3 */;
+        hybirdSimMap.put(name, Double.parseDouble(df.format(sim)));
+      } else {
+        double sim = sortedVariableSimMap.get(name);
+        hybirdSimMap.put(name, Double.parseDouble(df.format(sim)));
+      }
+    }
+
+    for (String name : sortedSessionSimMap.keySet()) {
+      if (hybirdSimMap.get(name) != null) {
+        double sim = hybirdSimMap.get(name) + sortedSessionSimMap.get(name) /** 0.1 */;
+        hybirdSimMap.put(name, Double.parseDouble(df.format(sim)));
+      } else {
+        double sim = sortedSessionSimMap.get(name);
+        hybirdSimMap.put(name, Double.parseDouble(df.format(sim)));
+      }
+    }
+
+    Map<String, Double> sortedHybirdSimMap = this.sortMapByValue(hybirdSimMap);
+
+    JsonElement linkedJson = mapToJson(sortedHybirdSimMap, num);
+    resultJson.add("linked", linkedJson);
+
+    return resultJson;
+  }
+
+  /**
+   * Method of converting hashmap to JSON
+   *
+   * @param wordweights a map from related metadata to weights
+   * @param num         the number of converted elements
+   * @return converted JSON object
+   */
+  protected JsonElement mapToJson(Map<String, Double> wordweights, int num) {
+    Gson gson = new Gson();
+
+    List<JsonObject> nodes = new ArrayList<>();
+    Set<String> words = wordweights.keySet();
+    int i = 0;
+    for (String wordB : words) {
+      JsonObject node = new JsonObject();
+      node.addProperty("name", wordB);
+      node.addProperty("weight", wordweights.get(wordB));
+      nodes.add(node);
+
+      i += 1;
+      if (i >= num) {
+        break;
+      }
+    }
+
+    String nodesJson = gson.toJson(nodes);
+    JsonElement nodesElement = gson.fromJson(nodesJson, JsonElement.class);
+
+    return nodesElement;
+  }
+
+  /**
+   * Get recommend dataset for a giving dataset
+   *
+   * @param type  recommend method
+   * @param input a giving dataset
+   * @param num   the number of recommended dataset
+   * @return recommended dataset map, key is dataset name, value is similarity
+   * value
+   */
+  public Map<String, Double> getRelatedData(String type, String input, int num) {
+    termList = new ArrayList<>();
+    Map<String, Double> termsMap = new HashMap<>();
+    Map<String, Double> sortedMap = new HashMap<>();
+    try {
+      List<LinkedTerm> links = getRelatedDataFromES(type, input, num);
+      int size = links.size();
+      for (int i = 0; i < size; i++) {
+        termsMap.put(links.get(i).term, links.get(i).weight);
+      }
+
+      sortedMap = sortMapByValue(termsMap); // terms_map will be empty
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    return sortedMap;
+  }
+
+  /**
+   * Get recommend dataset for a giving dataset
+   *
+   * @param type  recommend method
+   * @param input a giving dataset
+   * @param num   the number of recommended dataset
+   * @return recommended dataset list
+   */
+  public List<LinkedTerm> getRelatedDataFromES(String type, String input, int num) {
+
+    SearchRequestBuilder builder = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(type).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC)
+        .setSize(num);
+
+    SearchResponse usrhis = builder.execute().actionGet();
+
+    for (SearchHit hit : usrhis.getHits().getHits()) {
+      Map<String, Object> result = hit.getSource();
+      String conceptB = (String) result.get("concept_B");
+
+      if (!conceptB.equals(input)) {
+        LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), type);
+        termList.add(lTerm);
+      }
+    }
+
+    return termList;
+  }
+
+  /**
+   * Method of sorting a map by value
+   *
+   * @param passedMap input map
+   * @return sorted map
+   */
+  public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) {
+    List<String> mapKeys = new ArrayList<>(passedMap.keySet());
+    List<Double> mapValues = new ArrayList<>(passedMap.values());
+    Collections.sort(mapValues, Collections.reverseOrder());
+    Collections.sort(mapKeys, Collections.reverseOrder());
+
+    LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>();
+
+    Iterator<Double> valueIt = mapValues.iterator();
+    while (valueIt.hasNext()) {
+      Object val = valueIt.next();
+      Iterator<String> keyIt = mapKeys.iterator();
+
+      while (keyIt.hasNext()) {
+        Object key = keyIt.next();
+        String comp1 = passedMap.get(key).toString();
+        String comp2 = val.toString();
+
+        if (comp1.equals(comp2)) {
+          passedMap.remove(key);
+          mapKeys.remove(key);
+          sortedMap.put((String) key, (Double) val);
+          break;
+        }
+      }
+    }
+    return sortedMap;
+  }
+
+  public static void main(String[] args) throws IOException {
+
+    MudrodEngine me = new MudrodEngine();
+    Properties props = me.loadConfig();
+    ESDriver es = new ESDriver(me.getConfig());
+    HybridRecommendation test = new HybridRecommendation(props, es, null);
+
+    // String input = "NSCAT_LEVEL_1.7_V2";
+    String input = "AQUARIUS_L3_SSS_SMIA_MONTHLY-CLIMATOLOGY_V4";
+    JsonObject json = test.getRecomDataInJson(input, 10);
+
+    System.out.println(json.toString());
+  }
+}