You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/12/19 14:13:17 UTC

[16/17] incubator-sdap-mudrod git commit: SDAP-7 Change all package namespaces to org.apache.sdap

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
deleted file mode 100644
index d1af04a..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
+++ /dev/null
@@ -1,572 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.driver;
-
-import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-import com.google.gson.GsonBuilder;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.main.MudrodEngine;
-import gov.nasa.jpl.mudrod.utils.ESTransportClient;
-import org.apache.commons.lang.StringUtils;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchType;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.Fuzziness;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.query.MatchAllQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.suggest.Suggest;
-import org.elasticsearch.search.suggest.SuggestBuilder;
-import org.elasticsearch.search.suggest.SuggestBuilders;
-import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-/**
- * Driver implementation for all Elasticsearch functionality.
- */
-public class ESDriver implements Serializable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ESDriver.class);
-  private static final long serialVersionUID = 1L;
-  private transient Client client = null;
-  private transient Node node = null;
-  private transient BulkProcessor bulkProcessor = null;
-
-  /**
-   * Default constructor for this class. To load client configuration call
-   * substantiated constructor.
-   */
-  public ESDriver() {
-    // Default constructor, to load configuration call ESDriver(props)
-  }
-
-  /**
-   * Substantiated constructor which accepts a {@link java.util.Properties}
-   *
-   * @param props a populated properties object.
-   */
-  public ESDriver(Properties props) {
-    try {
-      setClient(makeClient(props));
-    } catch (IOException e) {
-      LOG.error("Error whilst constructing Elastcisearch client.", e);
-    }
-  }
-
-  public void createBulkProcessor() {
-    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", 1000, 2500500);
-    setBulkProcessor(BulkProcessor.builder(getClient(), new BulkProcessor.Listener() {
-      @Override
-      public void beforeBulk(long executionId, BulkRequest request) {
-        LOG.debug("ESDriver#createBulkProcessor @Override #beforeBulk is not implemented yet!");
-      }
-
-      @Override
-      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-        LOG.debug("ESDriver#createBulkProcessor @Override #afterBulk is not implemented yet!");
-      }
-
-      @Override
-      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-        LOG.error("Bulk request has failed!");
-        throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure);
-      }
-    }).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1)
-        .build());
-  }
-
-  public void destroyBulkProcessor() {
-    try {
-      getBulkProcessor().awaitClose(20, TimeUnit.MINUTES);
-      setBulkProcessor(null);
-      refreshIndex();
-    } catch (InterruptedException e) {
-      LOG.error("Error destroying the Bulk Processor.", e);
-    }
-  }
-
-  public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException {
-
-    boolean exists = getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists();
-    if (exists) {
-      return;
-    }
-
-    getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet();
-    getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet();
-  }
-
-  public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException {
-    return this.customAnalyzing(indexName, "cody", str);
-  }
-
-  public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException {
-    String[] strList = str.toLowerCase().split(",");
-    for (int i = 0; i < strList.length; i++) {
-      String tmp = "";
-      AnalyzeResponse r = client.admin().indices().prepareAnalyze(strList[i]).setIndex(indexName).setAnalyzer(analyzer).execute().get();
-      for (AnalyzeToken token : r.getTokens()) {
-        tmp += token.getTerm() + " ";
-      }
-      strList[i] = tmp.trim();
-    }
-    return String.join(",", strList);
-  }
-
-  public List<String> customAnalyzing(String indexName, List<String> list) throws InterruptedException, ExecutionException {
-    if (list == null) {
-      return list;
-    }
-    int size = list.size();
-    List<String> customlist = new ArrayList<>();
-    for (int i = 0; i < size; i++) {
-      customlist.add(this.customAnalyzing(indexName, list.get(i)));
-    }
-
-    return customlist;
-  }
-
-  public void deleteAllByQuery(String index, String type, QueryBuilder query) {
-    createBulkProcessor();
-    SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new TimeValue(60000)).setQuery(query).setSize(10000).execute()
-        .actionGet();
-
-    while (true) {
-      for (SearchHit hit : scrollResp.getHits().getHits()) {
-        DeleteRequest deleteRequest = new DeleteRequest(index, type, hit.getId());
-        getBulkProcessor().add(deleteRequest);
-      }
-
-      scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
-      if (scrollResp.getHits().getHits().length == 0) {
-        break;
-      }
-
-    }
-    destroyBulkProcessor();
-  }
-
-  public void deleteType(String index, String type) {
-    this.deleteAllByQuery(index, type, QueryBuilders.matchAllQuery());
-  }
-
-  public List<String> getTypeListWithPrefix(Object object, Object object2) {
-    ArrayList<String> typeList = new ArrayList<>();
-    GetMappingsResponse res;
-    try {
-      res = getClient().admin().indices().getMappings(new GetMappingsRequest().indices(object.toString())).get();
-      ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(object.toString());
-      for (ObjectObjectCursor<String, MappingMetaData> c : mapping) {
-        if (c.key.startsWith(object2.toString())) {
-          typeList.add(c.key);
-        }
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      LOG.error("Error whilst obtaining type list from Elasticsearch mappings.", e);
-    }
-    return typeList;
-  }
-
-  public List<String> getIndexListWithPrefix(Object object) {
-
-    LOG.info("Retrieving index list with prefix: {}", object.toString());
-    String[] indices = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices();
-
-    ArrayList<String> indexList = new ArrayList<>();
-    int length = indices.length;
-    for (int i = 0; i < length; i++) {
-      String indexName = indices[i];
-      if (indexName.startsWith(object.toString())) {
-        indexList.add(indexName);
-      }
-    }
-
-    return indexList;
-  }
-
-  public String searchByQuery(String index, String type, String query) throws IOException, InterruptedException, ExecutionException {
-    return searchByQuery(index, type, query, false);
-  }
-
-  @SuppressWarnings("unchecked")
-  public String searchByQuery(String index, String type, String query, Boolean bDetail) throws IOException, InterruptedException, ExecutionException {
-    boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
-    if (!exists) {
-      return null;
-    }
-
-    QueryBuilder qb = QueryBuilders.queryStringQuery(query);
-    SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet();
-
-    // Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same.
-    Map<String, String> fieldsToReturn = new HashMap<>();
-
-    fieldsToReturn.put("Dataset-ShortName", "Short Name");
-    fieldsToReturn.put("Dataset-LongName", "Long Name");
-    fieldsToReturn.put("DatasetParameter-Topic", "Topic");
-    fieldsToReturn.put("Dataset-Description", "Dataset-Description");
-    fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date");
-
-    if (bDetail) {
-      fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat");
-      fieldsToReturn.put("Dataset-Doi", "Dataset-Doi");
-      fieldsToReturn.put("Dataset-ProcessingLevel", "Processing Level");
-      fieldsToReturn.put("DatasetCitation-Version", "Version");
-      fieldsToReturn.put("DatasetSource-Sensor-ShortName", "DatasetSource-Sensor-ShortName");
-      fieldsToReturn.put("DatasetProject-Project-ShortName", "DatasetProject-Project-ShortName");
-      fieldsToReturn.put("DatasetParameter-Category", "DatasetParameter-Category");
-      fieldsToReturn.put("DatasetLocationPolicy-BasePath", "DatasetLocationPolicy-BasePath");
-      fieldsToReturn.put("DatasetParameter-Variable-Full", "DatasetParameter-Variable-Full");
-      fieldsToReturn.put("DatasetParameter-Term-Full", "DatasetParameter-Term-Full");
-      fieldsToReturn.put("DatasetParameter-VariableDetail", "DatasetParameter-VariableDetail");
-
-      fieldsToReturn.put("DatasetRegion-Region", "Region");
-      fieldsToReturn.put("DatasetCoverage-NorthLat", "NorthLat");
-      fieldsToReturn.put("DatasetCoverage-SouthLat", "SouthLat");
-      fieldsToReturn.put("DatasetCoverage-WestLon", "WestLon");
-      fieldsToReturn.put("DatasetCoverage-EastLon", "EastLon");
-      fieldsToReturn.put("DatasetCoverage-StartTimeLong-Long", "DatasetCoverage-StartTimeLong-Long");
-      fieldsToReturn.put("Dataset-DatasetCoverage-StopTimeLong", "Dataset-DatasetCoverage-StopTimeLong");
-
-      fieldsToReturn.put("Dataset-TemporalResolution", "Dataset-TemporalResolution");
-      fieldsToReturn.put("Dataset-TemporalRepeat", "Dataset-TemporalRepeat");
-      fieldsToReturn.put("Dataset-LatitudeResolution", "Dataset-LatitudeResolution");
-      fieldsToReturn.put("Dataset-LongitudeResolution", "Dataset-LongitudeResolution");
-      fieldsToReturn.put("Dataset-AcrossTrackResolution", "Dataset-AcrossTrackResolution");
-      fieldsToReturn.put("Dataset-AlongTrackResolution", "Dataset-AlongTrackResolution");
-    }
-
-    List<Map<String, Object>> searchResults = new ArrayList<>();
-
-    for (SearchHit hit : response.getHits().getHits()) {
-      Map<String, Object> source = hit.getSource();
-
-      Map<String, Object> searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey()))
-          .collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue));
-
-      // searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result
-
-      // Some results require special handling/formatting:
-      // Release Date formatting
-      LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList<String>) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate();
-      searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE));
-
-      if (bDetail) {
-
-        // DataFormat value, translate RAW to BINARY
-        if ("RAW".equals(searchResult.get("DataFormat"))) {
-          searchResult.put("DataFormat", "BINARY");
-        }
-
-        // DatasetLocationPolicy-BasePath Should only contain ftp, http, or https URLs
-        List<String> urls = ((List<String>) searchResult.get("DatasetLocationPolicy-BasePath")).stream().filter(url -> url.startsWith("ftp") || url.startsWith("http")).collect(Collectors.toList());
-        searchResult.put("DatasetLocationPolicy-BasePath", urls);
-
-        // Time Span Formatting
-        LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate();
-        LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ?
-            null :
-            Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate();
-        searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE)));
-
-        // Temporal resolution can come from one of two fields
-        searchResult.put("TemporalResolution", "".equals(searchResult.get("Dataset-TemporalResolution")) ? searchResult.get("Dataset-TemporalRepeat") : searchResult.get("Dataset-TemporalResolution"));
-
-        // Special formatting for spatial resolution
-        String latResolution = (String) searchResult.get("Dataset-LatitudeResolution");
-        String lonResolution = (String) searchResult.get("Dataset-LongitudeResolution");
-        if (!latResolution.isEmpty() && !lonResolution.isEmpty()) {
-          searchResult.put("SpatialResolution", latResolution + " degrees (latitude) x " + lonResolution + " degrees (longitude)");
-        } else {
-          String acrossResolution = (String) searchResult.get("Dataset-AcrossTrackResolution");
-          String alonResolution = (String) searchResult.get("Dataset-AlongTrackResolution");
-          double dAcrossResolution = Double.parseDouble(acrossResolution) / 1000;
-          double dAlonResolution = Double.parseDouble(alonResolution) / 1000;
-          searchResult.put("SpatialResolution", dAlonResolution + " km (Along) x " + dAcrossResolution + " km (Across)");
-        }
-
-        // Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies.
-        List<List<String>> measurements = buildMeasurementHierarchies((List<String>) searchResult.get("Topic"), (List<String>) searchResult.get("DatasetParameter-Term-Full"),
-            (List<String>) searchResult.get("DatasetParameter-Variable-Full"), (List<String>) searchResult.get("DatasetParameter-VariableDetail"));
-
-        searchResult.put("Measurements", measurements);
-
-      }
-
-      searchResults.add(searchResult);
-    }
-
-    Map<String, List<?>> pdResults = new HashMap<>();
-    pdResults.put("PDResults", searchResults);
-
-    return new GsonBuilder().create().toJson(pdResults);
-  }
-
-  /**
-   * Builds a List of Measurement Hierarchies given the individual source lists.
-   * The hierarchy is built from the element in the same position from each input list in the order: Topic -> Term -> Variable -> VariableDetail
-   * "None" and blank strings are ignored. If, at any level, an element does not exist for that position or it is "None" or blank, that hierarchy is considered complete.
-   *
-   * For example, if the input is:
-   * <pre>
-   * topics = ["Oceans", "Oceans"]
-   * terms = ["Sea Surface Topography", "Ocean Waves"]
-   * variables = ["Sea Surface Height", "Significant Wave Height"]
-   * variableDetails = ["None", "None"]
-   * </pre>
-   *
-   * The output would be:
-   * <pre>
-   *   [
-   *     ["Oceans", "Sea Surface Topography", "Sea Surface Height"],
-   *     ["Oceans", "Ocean Waves", "Significant Wave Height"]
-   *   ]
-   * </pre>
-   *     Oceans > Sea Surface Topography > Sea Surface Height
-   *     Oceans > Ocean Waves > Significant Wave Height
-   *
-   * @param topics List of topics, the first element of a measurement
-   * @param terms List of terms, the second element of a measurement
-   * @param variables List of variables, the third element of a measurement
-   * @param variableDetails List of variable details, the fourth element of a measurement
-   *
-   * @return A List where each element is a single hierarchy (as a List) built from the provided input lists.
-   */
-  private List<List<String>> buildMeasurementHierarchies(List<String> topics, List<String> terms, List<String> variables, List<String> variableDetails) {
-
-    List<List<String>> measurements = new ArrayList<>();
-
-    for (int x = 0; x < topics.size(); x++) {
-      measurements.add(new ArrayList<>());
-      measurements.get(x).add(topics.get(x));
-      // Only add the next 'level' if we can
-      if (x < terms.size() && !"None".equalsIgnoreCase(terms.get(x)) && StringUtils.isNotBlank(terms.get(x))) {
-        measurements.get(x).add(terms.get(x));
-        if (x < variables.size() && !"None".equalsIgnoreCase(variables.get(x)) && StringUtils.isNotBlank(variables.get(x))) {
-          measurements.get(x).add(variables.get(x));
-          if (x < variableDetails.size() && !"None".equalsIgnoreCase(variableDetails.get(x)) && StringUtils.isNotBlank(variableDetails.get(x))) {
-            measurements.get(x).add(variableDetails.get(x));
-          }
-        }
-      }
-    }
-
-    return measurements;
-
-  }
-
-  public List<String> autoComplete(String index, String term) {
-    boolean exists = this.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
-    if (!exists) {
-      return new ArrayList<>();
-    }
-
-    Set<String> suggestHS = new HashSet<String>();
-    List<String> suggestList = new ArrayList<>();
-
-    // please make sure that the completion field is configured in the ES mapping
-    CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100);
-    SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder));
-    SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet();
-
-    Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator();
-
-    while (iterator.hasNext()) {
-      Suggest.Suggestion.Entry.Option next = iterator.next();
-      String suggest = next.getText().string().toLowerCase();
-      suggestList.add(suggest);
-    }
-
-    suggestHS.addAll(suggestList);
-    suggestList.clear();
-    suggestList.addAll(suggestHS);
-    return suggestList;
-  }
-
-  public void close() {
-    client.close();
-  }
-
-  public void refreshIndex() {
-    client.admin().indices().prepareRefresh().execute().actionGet();
-  }
-
-  /**
-   * Generates a TransportClient or NodeClient
-   *
-   * @param props a populated {@link java.util.Properties} object
-   * @return a constructed {@link org.elasticsearch.client.Client}
-   * @throws IOException if there is an error building the
-   *                     {@link org.elasticsearch.client.Client}
-   */
-  protected Client makeClient(Properties props) throws IOException {
-    String clusterName = props.getProperty(MudrodConstants.ES_CLUSTER);
-    String hostsString = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
-    String[] hosts = hostsString.split(",");
-    String portStr = props.getProperty(MudrodConstants.ES_TRANSPORT_TCP_PORT);
-    int port = Integer.parseInt(portStr);
-
-    Settings.Builder settingsBuilder = Settings.builder();
-
-    // Set the cluster name and build the settings
-    if (!clusterName.isEmpty())
-      settingsBuilder.put("cluster.name", clusterName);
-
-    settingsBuilder.put("http.type", "netty3");
-    settingsBuilder.put("transport.type", "netty3");
-
-    Settings settings = settingsBuilder.build();
-
-    Client client = null;
-
-    // Prefer TransportClient
-    if (hosts != null && port > 1) {
-      TransportClient transportClient = new ESTransportClient(settings);
-      for (String host : hosts)
-        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
-      client = transportClient;
-    } else if (clusterName != null) {
-      node = new Node(settings);
-      client = node.client();
-    }
-
-    return client;
-  }
-
-  /**
-   * Main method used to invoke the ESDriver implementation.
-   *
-   * @param args no arguments are required to invoke the Driver.
-   */
-  public static void main(String[] args) {
-    MudrodEngine mudrodEngine = new MudrodEngine();
-    ESDriver es = new ESDriver(mudrodEngine.loadConfig());
-    es.getTypeListWithPrefix("podaacsession", "sessionstats");
-  }
-
-  /**
-   * @return the client
-   */
-  public Client getClient() {
-    return client;
-  }
-
-  /**
-   * @param client the client to set
-   */
-  public void setClient(Client client) {
-    this.client = client;
-  }
-
-  /**
-   * @return the bulkProcessor
-   */
-  public BulkProcessor getBulkProcessor() {
-    return bulkProcessor;
-  }
-
-  /**
-   * @param bulkProcessor the bulkProcessor to set
-   */
-  public void setBulkProcessor(BulkProcessor bulkProcessor) {
-    this.bulkProcessor = bulkProcessor;
-  }
-
-  public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) {
-
-    UpdateRequest ur = null;
-    try {
-      ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject());
-    } catch (IOException e) {
-      LOG.error("Error whilst attempting to generate a new Update Request.", e);
-    }
-
-    return ur;
-  }
-
-  public UpdateRequest generateUpdateRequest(String index, String type, String id, Map<String, Object> filedValueMap) {
-
-    UpdateRequest ur = null;
-    try {
-      XContentBuilder builder = jsonBuilder().startObject();
-      for (Entry<String, Object> entry : filedValueMap.entrySet()) {
-        String key = entry.getKey();
-        builder.field(key, filedValueMap.get(key));
-      }
-      builder.endObject();
-      ur = new UpdateRequest(index, type, id).doc(builder);
-    } catch (IOException e) {
-      LOG.error("Error whilst attempting to generate a new Update Request.", e);
-    }
-
-    return ur;
-  }
-
-  public int getDocCount(String index, String... type) {
-    MatchAllQueryBuilder search = QueryBuilders.matchAllQuery();
-    String[] indexArr = new String[] { index };
-    return this.getDocCount(indexArr, type, search);
-  }
-
-  public int getDocCount(String[] index, String[] type) {
-    MatchAllQueryBuilder search = QueryBuilders.matchAllQuery();
-    return this.getDocCount(index, type, search);
-  }
-
-  public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch) {
-    SearchRequestBuilder countSrBuilder = getClient().prepareSearch(index).setTypes(type).setQuery(filterSearch).setSize(0);
-    SearchResponse countSr = countSrBuilder.execute().actionGet();
-    int docCount = (int) countSr.getHits().getTotalHits();
-    return docCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
deleted file mode 100644
index e49c029..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.driver;
-
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.serializer.KryoSerializer;
-import org.apache.spark.sql.SQLContext;
-
-import java.io.File;
-import java.io.Serializable;
-import java.net.URISyntaxException;
-import java.util.Properties;
-//import org.apache.spark.sql.SparkSession;
-
-public class SparkDriver implements Serializable {
-
-  //TODO the commented out code below is the API uprgade
-  //for Spark 2.0.0. It requires a large upgrade and simplification
-  //across the mudrod codebase so should be done in an individual ticket.
-  //  /**
-  //   *
-  //   */
-  //  private static final long serialVersionUID = 1L;
-  //  private SparkSession builder;
-  //
-  //  public SparkDriver() {
-  //    builder = SparkSession.builder()
-  //        .master("local[2]")
-  //        .config("spark.hadoop.validateOutputSpecs", "false")
-  //        .config("spark.files.overwrite", "true")
-  //        .getOrCreate();
-  //  }
-  //
-  //  public SparkSession getBuilder() {
-  //    return builder;
-  //  }
-  //
-  //  public void setBuilder(SparkSession builder) {
-  //    this.builder = builder;
-  //  }
-  //
-  //  public void close() {
-  //    builder.stop();
-  //  }
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  public transient JavaSparkContext sc;
-  public transient SQLContext sqlContext;
-
-  public SparkDriver() {
-    // empty default constructor
-  }
-
-  public SparkDriver(Properties props) {
-    SparkConf conf = new SparkConf().setAppName(props.getProperty(MudrodConstants.SPARK_APP_NAME, "MudrodSparkApp")).setIfMissing("spark.master", props.getProperty(MudrodConstants.SPARK_MASTER))
-        .set("spark.hadoop.validateOutputSpecs", "false").set("spark.files.overwrite", "true");
-
-    String esHost = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
-    String esPort = props.getProperty(MudrodConstants.ES_HTTP_PORT);
-
-    if (!"".equals(esHost)) {
-      conf.set("es.nodes", esHost);
-    }
-
-    if (!"".equals(esPort)) {
-      conf.set("es.port", esPort);
-    }
-
-    conf.set("spark.serializer", KryoSerializer.class.getName());
-    conf.set("es.batch.size.entries", "1500");
-
-    sc = new JavaSparkContext(conf);
-    sqlContext = new SQLContext(sc);
-  }
-
-  public void close() {
-    sc.sc().stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
deleted file mode 100644
index d9cde36..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes commonly used Elasticsearch and Spark related
- * functions
- */
-package gov.nasa.jpl.mudrod.driver;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
deleted file mode 100644
index 14b667d..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.integration;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.sort.SortOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.DecimalFormat;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-/**
- * Supports ability to integrate vocab similarity results from metadata, ontology, and web logs.
- */
-public class LinkageIntegration extends DiscoveryStepAbstract {
-
-  private static final Logger LOG = LoggerFactory.getLogger(LinkageIntegration.class);
-  private static final long serialVersionUID = 1L;
-  transient List<LinkedTerm> termList = new ArrayList<>();
-  DecimalFormat df = new DecimalFormat("#.00");
-  private static final String INDEX_NAME = "indexName";
-  private static final String WEIGHT = "weight";
-
-  public LinkageIntegration(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  /**
-   * The data structure to store semantic triple.
-   */
-  class LinkedTerm {
-    String term = null;
-    double weight = 0;
-    String model = null;
-
-    public LinkedTerm(String str, double w, String m) {
-      term = str;
-      weight = w;
-      model = m;
-    }
-  }
-
-  /**
-   * Method of executing integration step
-   */
-  @Override
-  public Object execute() {
-    getIngeratedList("ocean wind", 11);
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-  /**
-   * Method of getting integrated results
-   *
-   * @param input query string
-   * @return a hash map where the string is a related term, and double is the
-   * similarity to the input query
-   */
-  public Map<String, Double> appyMajorRule(String input) {
-    termList = new ArrayList<>();
-    Map<String, Double> termsMap = new HashMap<>();
-    Map<String, List<LinkedTerm>> map = new HashMap<>();
-    try {
-      map = aggregateRelatedTermsFromAllmodel(es.customAnalyzing(props.getProperty(INDEX_NAME), input));
-    } catch (InterruptedException | ExecutionException e) {
-      LOG.error("Error applying majority rule", e);
-    }
-
-    for (Entry<String, List<LinkedTerm>> entry : map.entrySet()) {
-      List<LinkedTerm> list = entry.getValue();
-      double sumModelWeight = 0;
-      double tmp = 0;
-      for (LinkedTerm element : list) {
-        sumModelWeight += getModelweight(element.model);
-
-        if (element.weight > tmp) {
-          tmp = element.weight;
-        }
-      }
-
-      double finalWeight = tmp + ((sumModelWeight - 2) * 0.05);
-      if (finalWeight < 0) {
-        finalWeight = 0;
-      }
-
-      if (finalWeight > 1) {
-        finalWeight = 1;
-      }
-      termsMap.put(entry.getKey(), Double.parseDouble(df.format(finalWeight)));
-    }
-
-    return sortMapByValue(termsMap);
-  }
-
-  /**
-   * Method of getting integrated results
-   *
-   * @param input query string
-   * @param num   the number of most related terms
-   * @return a string of related terms along with corresponding similarities
-   */
-  public String getIngeratedList(String input, int num) {
-    String output = "";
-    Map<String, Double> sortedMap = appyMajorRule(input);
-    int count = 0;
-    for (Entry<String, Double> entry : sortedMap.entrySet()) {
-      if (count < num) {
-        output += entry.getKey() + " = " + entry.getValue() + ", ";
-      }
-      count++;
-    }
-    LOG.info("\n************************Integrated results***************************");
-    LOG.info(output);
-    return output;
-  }
-
-  /**
-   * Method of getting integrated results
-   *
-   * @param input query string
-   * @return a JSON object of related terms along with corresponding similarities
-   */
-  public JsonObject getIngeratedListInJson(String input) {
-    Map<String, Double> sortedMap = appyMajorRule(input);
-    int count = 0;
-    Map<String, Double> trimmedMap = new LinkedHashMap<>();
-    for (Entry<String, Double> entry : sortedMap.entrySet()) {
-      if (!entry.getKey().contains("china")) {
-        if (count < 10) {
-          trimmedMap.put(entry.getKey(), entry.getValue());
-        }
-        count++;
-      }
-    }
-
-    return mapToJson(trimmedMap);
-  }
-
-  /**
-   * Method of aggregating terms from web logs, metadata, and ontology
-   *
-   * @param input query string
-   * @return a hash map where the string is a related term, and the list is
-   * the similarities from different sources
-   */
-  public Map<String, List<LinkedTerm>> aggregateRelatedTermsFromAllmodel(String input) {
-    aggregateRelatedTerms(input, props.getProperty("userHistoryLinkageType"));
-    aggregateRelatedTerms(input, props.getProperty("clickStreamLinkageType"));
-    aggregateRelatedTerms(input, props.getProperty("metadataLinkageType"));
-    aggregateRelatedTermsSWEET(input, props.getProperty("ontologyLinkageType"));
-
-    return termList.stream().collect(Collectors.groupingBy(w -> w.term));
-  }
-
-  public int getModelweight(String model) {
-    if (model.equals(props.getProperty("userHistoryLinkageType"))) {
-      return Integer.parseInt(props.getProperty("userHistory_w"));
-    }
-
-    if (model.equals(props.getProperty("clickStreamLinkageType"))) {
-      return Integer.parseInt(props.getProperty("clickStream_w"));
-    }
-
-    if (model.equals(props.getProperty("metadataLinkageType"))) {
-      return Integer.parseInt(props.getProperty("metadata_w"));
-    }
-
-    if (model.equals(props.getProperty("ontologyLinkageType"))) {
-      return Integer.parseInt(props.getProperty("ontology_w"));
-    }
-
-    return 999999;
-  }
-
-  /**
-   * Method of extracting the related term from a comma string
-   *
-   * @param str   input string
-   * @param input query string
-   * @return related term contained in the input string
-   */
-  public String extractRelated(String str, String input) {
-    String[] strList = str.split(",");
-    if (input.equals(strList[0])) {
-      return strList[1];
-    } else {
-      return strList[0];
-    }
-  }
-
-  public void aggregateRelatedTerms(String input, String model) {
-    //get the first 10 related terms
-    SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("keywords", input)).addSort(WEIGHT, SortOrder.DESC).setSize(11)
-        .execute().actionGet();
-
-    LOG.info("\n************************ {} results***************************", model);
-    for (SearchHit hit : usrhis.getHits().getHits()) {
-      Map<String, Object> result = hit.getSource();
-      String keywords = (String) result.get("keywords");
-      String relatedKey = extractRelated(keywords, input);
-
-      if (!relatedKey.equals(input)) {
-        LinkedTerm lTerm = new LinkedTerm(relatedKey, (double) result.get(WEIGHT), model);
-        LOG.info("( {} {} )", relatedKey, (double) result.get(WEIGHT));
-        termList.add(lTerm);
-      }
-
-    }
-  }
-
-  /**
-   * Method of querying related terms from ontology
-   *
-   * @param input input query
-   * @param model source name
-   */
-  public void aggregateRelatedTermsSWEET(String input, String model) {
-    SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC)
-        .setSize(11).execute().actionGet();
-    LOG.info("\n************************ {} results***************************", model);
-    for (SearchHit hit : usrhis.getHits().getHits()) {
-      Map<String, Object> result = hit.getSource();
-      String conceptB = (String) result.get("concept_B");
-      if (!conceptB.equals(input)) {
-        LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), model);
-        LOG.info("( {} {} )", conceptB, (double) result.get(WEIGHT));
-        termList.add(lTerm);
-      }
-    }
-  }
-
-  /**
-   * Method of sorting a map by value
-   *
-   * @param passedMap input map
-   * @return sorted map
-   */
-  public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) {
-    List<String> mapKeys = new ArrayList<>(passedMap.keySet());
-    List<Double> mapValues = new ArrayList<>(passedMap.values());
-    Collections.sort(mapValues, Collections.reverseOrder());
-    Collections.sort(mapKeys, Collections.reverseOrder());
-
-    LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>();
-
-    Iterator<Double> valueIt = mapValues.iterator();
-    while (valueIt.hasNext()) {
-      Object val = valueIt.next();
-      Iterator<String> keyIt = mapKeys.iterator();
-
-      while (keyIt.hasNext()) {
-        Object key = keyIt.next();
-        String comp1 = passedMap.get(key).toString();
-        String comp2 = val.toString();
-
-        if (comp1.equals(comp2)) {
-          passedMap.remove(key);
-          mapKeys.remove(key);
-          sortedMap.put((String) key, (Double) val);
-          break;
-        }
-
-      }
-
-    }
-    return sortedMap;
-  }
-
-  /**
-   * Method of converting hashmap to JSON
-   *
-   * @param word        input query
-   * @param wordweights a map from related terms to weights
-   * @return converted JSON object
-   */
-  private JsonObject mapToJson(Map<String, Double> wordweights) {
-    Gson gson = new Gson();
-    JsonObject json = new JsonObject();
-    List<JsonObject> nodes = new ArrayList<>();
-
-    for (Entry<String, Double> entry : wordweights.entrySet()) {
-      JsonObject node = new JsonObject();
-      String key = entry.getKey();
-      Double value = entry.getValue();
-      node.addProperty("word", key);
-      node.addProperty("weight", value);
-      nodes.add(node);
-    }
-
-    JsonElement nodesElement = gson.toJsonTree(nodes);
-    json.add("ontology", nodesElement);
-
-    return json;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
deleted file mode 100644
index 7f2ba66..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes integration method of web log, ontology, and metdata
- * mining results.
- */
-package gov.nasa.jpl.mudrod.integration;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java
deleted file mode 100644
index ee7cbfa..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.main;
-
-import gov.nasa.jpl.mudrod.ontology.Ontology;
-
-/**
- * Class contains static constant keys and values relating to Mudrod
- * configuration properties. Property values are read from <a href=
- * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>
- */
-public interface MudrodConstants {
-
-  public static final String CLEANUP_TYPE_PREFIX = "Cleanup_type_prefix";
-
-  public static final String CLICK_STREAM_LINKAGE_TYPE = "clickStreamLinkageType";
-
-  public static final String CLICK_STREAM_MATRIX_TYPE = "clickStreamMatrixType";
-
-  public static final String CLICKSTREAM_SVD_DIM = "clickstreamSVDDimension";
-
-  public static final String CLICKSTREAM_W = "clickStream_w";
-
-  public static final String COMMENT_TYPE = "commentType";
-
-  /** Defined on CLI */
-  public static final String DATA_DIR = "dataDir";
-
-  public static final String DOWNLOAD_F = "downloadf";
-
-  public static final String DOWNLOAD_WEIGHT = "downloadWeight";
-
-  public static final String ES_CLUSTER = "clusterName";
-
-  public static final String ES_TRANSPORT_TCP_PORT = "ES_Transport_TCP_Port";
-
-  public static final String ES_UNICAST_HOSTS = "ES_unicast_hosts";
-
-  public static final String ES_HTTP_PORT = "ES_HTTP_port";
-
-  public static final String ES_INDEX_NAME = "indexName";
-
-  public static final String FTP_PREFIX = "ftpPrefix";
-
-  public static final String FTP_TYPE_PREFIX = "FTP_type_prefix";
-
-  public static final String HTTP_PREFIX = "httpPrefix";
-
-  public static final String HTTP_TYPE_PREFIX = "HTTP_type_prefix";
-
-  public static final String LOG_INDEX = "logIndexName";
-
-  public static final String METADATA_LINKAGE_TYPE = "metadataLinkageType";
-
-  public static final String METADATA_SVD_DIM = "metadataSVDDimension";
-
-  public static final String METADATA_URL = "metadataurl";
-
-  public static final String METADATA_W = "metadata_w";
-
-  public static final String MINI_USER_HISTORY = "mini_userHistory";
-
-  public static final String MUDROD = "mudrod";
-
-  /** Defined on CLI */
-  public static final String MUDROD_CONFIG = "MUDROD_CONFIG";
-  /**
-   * An {@link Ontology} implementation.
-   */
-  public static final String ONTOLOGY_IMPL = MUDROD + "ontology.implementation";
-
-  public static final String ONTOLOGY_LINKAGE_TYPE = "ontologyLinkageType";
-
-  public static final String ONTOLOGY_W = "ontology_w";
-
-  public static final String PROCESS_TYPE = "processingType";
-
-  /** Defined on CLI */
-  public static final String RAW_METADATA_PATH = "raw_metadataPath";
-
-  public static final String RAW_METADATA_TYPE = "raw_metadataType";
-
-  public static final String SEARCH_F = "searchf";
-
-  public static final String SENDING_RATE = "sendingrate";
-
-  public static final String SESSION_PORT = "SessionPort";
-
-  public static final String SESSION_STATS_PREFIX = "SessionStats_prefix";
-
-  public static final String SESSION_URL = "SessionUrl";
-
-  public static final String SPARK_APP_NAME = "spark.app.name";
-
-  public static final String SPARK_MASTER = "spark.master";
-  /**
-   * Absolute local location of javaSVMWithSGDModel directory. This is typically
-   * <code>file:///usr/local/mudrod/core/src/main/resources/javaSVMWithSGDModel</code>
-   */
-  public static final String SVM_SGD_MODEL = "svmSgdModel";
-
-  public static final String TIMEGAP = "timegap";
-
-  public static final String TIME_SUFFIX = "TimeSuffix";
-
-  public static final String USE_HISTORY_LINKAGE_TYPE = "userHistoryLinkageType";
-
-  public static final String USER_HISTORY_W = "userHistory_w";
-
-  public static final String VIEW_F = "viewf";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java
deleted file mode 100644
index 27dba84..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.main;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryEngineAbstract;
-import gov.nasa.jpl.mudrod.discoveryengine.MetadataDiscoveryEngine;
-import gov.nasa.jpl.mudrod.discoveryengine.OntologyDiscoveryEngine;
-import gov.nasa.jpl.mudrod.discoveryengine.RecommendEngine;
-import gov.nasa.jpl.mudrod.discoveryengine.WeblogDiscoveryEngine;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.integration.LinkageIntegration;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.jdom2.Document;
-import org.jdom2.Element;
-import org.jdom2.JDOMException;
-import org.jdom2.input.SAXBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Properties;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import static gov.nasa.jpl.mudrod.main.MudrodConstants.DATA_DIR;
-
-/**
- * Main entry point for Running the Mudrod system. Invocation of this class is
- * tightly linked to the primary Mudrod configuration which can be located at
- * <a href=
- * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>.
- */
-public class MudrodEngine {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MudrodEngine.class);
-  private Properties props = new Properties();
-  private ESDriver es = null;
-  private SparkDriver spark = null;
-  private static final String LOG_INGEST = "logIngest";
-  private static final String META_INGEST = "metaIngest";
-  private static final String FULL_INGEST = "fullIngest";
-  private static final String PROCESSING = "processingWithPreResults";
-  private static final String ES_HOST = "esHost";
-  private static final String ES_TCP_PORT = "esTCPPort";
-  private static final String ES_HTTP_PORT = "esPort";
-
-  /**
-   * Public constructor for this class.
-   */
-  public MudrodEngine() {
-    // default constructor
-  }
-
-  /**
-   * Start the {@link ESDriver}. Should only be called after call to
-   * {@link MudrodEngine#loadConfig()}
-   *
-   * @return fully provisioned {@link ESDriver}
-   */
-  public ESDriver startESDriver() {
-    return new ESDriver(props);
-  }
-
-  /**
-   * Start the {@link SparkDriver}. Should only be called after call to
-   * {@link MudrodEngine#loadConfig()}
-   *
-   * @return fully provisioned {@link SparkDriver}
-   */
-  public SparkDriver startSparkDriver() {
-    return new SparkDriver(props);
-  }
-
-  /**
-   * Retreive the Mudrod configuration as a Properties Map containing K, V of
-   * type String.
-   *
-   * @return a {@link java.util.Properties} object
-   */
-  public Properties getConfig() {
-    return props;
-  }
-
-  /**
-   * Retreive the Mudrod {@link ESDriver}
-   *
-   * @return the {@link ESDriver} instance.
-   */
-  public ESDriver getESDriver() {
-    return this.es;
-  }
-
-  /**
-   * Set the Elasticsearch driver for MUDROD
-   *
-   * @param es
-   *          an ES driver instance
-   */
-  public void setESDriver(ESDriver es) {
-    this.es = es;
-  }
-
-  private InputStream locateConfig() {
-
-    String configLocation = System.getenv(MudrodConstants.MUDROD_CONFIG) == null ? "" : System.getenv(MudrodConstants.MUDROD_CONFIG);
-    File configFile = new File(configLocation);
-
-    try {
-      InputStream configStream = new FileInputStream(configFile);
-      LOG.info("Loaded config file from " + configFile.getAbsolutePath());
-      return configStream;
-    } catch (IOException e) {
-      LOG.info("File specified by environment variable " + MudrodConstants.MUDROD_CONFIG + "=\'" + configLocation + "\' could not be loaded. " + e.getMessage());
-    }
-
-    InputStream configStream = MudrodEngine.class.getClassLoader().getResourceAsStream("config.xml");
-
-    if (configStream != null) {
-      LOG.info("Loaded config file from {}", MudrodEngine.class.getClassLoader().getResource("config.xml").getPath());
-    }
-
-    return configStream;
-  }
-
-  /**
-   * Load the configuration provided at <a href=
-   * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>.
-   *
-   * @return a populated {@link java.util.Properties} object.
-   */
-  public Properties loadConfig() {
-    SAXBuilder saxBuilder = new SAXBuilder();
-
-    InputStream configStream = locateConfig();
-
-    Document document;
-    try {
-      document = saxBuilder.build(configStream);
-      Element rootNode = document.getRootElement();
-      List<Element> paraList = rootNode.getChildren("para");
-
-      for (int i = 0; i < paraList.size(); i++) {
-        Element paraNode = paraList.get(i);
-        String attributeName = paraNode.getAttributeValue("name");
-        if (MudrodConstants.SVM_SGD_MODEL.equals(attributeName)) {
-          props.put(attributeName, decompressSVMWithSGDModel(paraNode.getTextTrim()));
-        } else {
-          props.put(attributeName, paraNode.getTextTrim());
-        }
-      }
-    } catch (JDOMException | IOException e) {
-      LOG.error("Exception whilst retrieving or processing XML contained within 'config.xml'!", e);
-    }
-    return getConfig();
-
-  }
-
-  private String decompressSVMWithSGDModel(String archiveName) throws IOException {
-
-    URL scmArchive = getClass().getClassLoader().getResource(archiveName);
-    if (scmArchive == null) {
-      throw new IOException("Unable to locate " + archiveName + " as a classpath resource.");
-    }
-    File tempDir = Files.createTempDirectory("mudrod").toFile();
-    assert tempDir.setWritable(true);
-    File archiveFile = new File(tempDir, archiveName);
-    FileUtils.copyURLToFile(scmArchive, archiveFile);
-
-    // Decompress archive
-    int BUFFER_SIZE = 512000;
-    ZipInputStream zipIn = new ZipInputStream(new FileInputStream(archiveFile));
-    ZipEntry entry;
-    while ((entry = zipIn.getNextEntry()) != null) {
-      File f = new File(tempDir, entry.getName());
-      // If the entry is a directory, create the directory.
-      if (entry.isDirectory() && !f.exists()) {
-        boolean created = f.mkdirs();
-        if (!created) {
-          LOG.error("Unable to create directory '{}', during extraction of archive contents.", f.getAbsolutePath());
-        }
-      } else if (!entry.isDirectory()) {
-        boolean created = f.getParentFile().mkdirs();
-        if (!created && !f.getParentFile().exists()) {
-          LOG.error("Unable to create directory '{}', during extraction of archive contents.", f.getParentFile().getAbsolutePath());
-        }
-        int count;
-        byte data[] = new byte[BUFFER_SIZE];
-        FileOutputStream fos = new FileOutputStream(new File(tempDir, entry.getName()), false);
-        try (BufferedOutputStream dest = new BufferedOutputStream(fos, BUFFER_SIZE)) {
-          while ((count = zipIn.read(data, 0, BUFFER_SIZE)) != -1) {
-            dest.write(data, 0, count);
-          }
-        }
-      }
-    }
-
-    return new File(tempDir, StringUtils.removeEnd(archiveName, ".zip")).toURI().toString();
-  }
-
-  /**
-   * Preprocess and process logs {@link DiscoveryEngineAbstract} implementations
-   * for weblog
-   */
-  public void startLogIngest() {
-    DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark);
-    wd.preprocess();
-    wd.process();
-    LOG.info("*****************logs have been ingested successfully******************");
-  }
-
-  /**
-   * updating and analysing metadata to metadata similarity results
-   */
-  public void startMetaIngest() {
-    DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark);
-    md.preprocess();
-    md.process();
-
-    DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark);
-    recom.preprocess();
-    recom.process();
-    LOG.info("Metadata has been ingested successfully.");
-  }
-
-  public void startFullIngest() {
-    DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark);
-    wd.preprocess();
-    wd.process();
-
-    DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark);
-    md.preprocess();
-    md.process();
-
-    DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark);
-    recom.preprocess();
-    recom.process();
-    LOG.info("Full ingest has finished successfully.");
-  }
-
-  /**
-   * Only preprocess various {@link DiscoveryEngineAbstract} implementations for
-   * weblog, ontology and metadata, linkage discovery and integration.
-   */
-  public void startProcessing() {
-    DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark);
-    wd.process();
-
-    DiscoveryEngineAbstract od = new OntologyDiscoveryEngine(props, es, spark);
-    od.preprocess();
-    od.process();
-
-    DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark);
-    md.preprocess();
-    md.process();
-
-    LinkageIntegration li = new LinkageIntegration(props, es, spark);
-    li.execute();
-
-    DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark);
-    recom.process();
-  }
-
-  /**
-   * Close the connection to the {@link ESDriver} instance.
-   */
-  public void end() {
-    if (es != null) {
-      es.close();
-    }
-  }
-
-  /**
-   * Main program invocation. Accepts one argument denoting location (on disk)
-   * to a log file which is to be ingested. Help will be provided if invoked
-   * with incorrect parameters.
-   *
-   * @param args
-   *          {@link java.lang.String} array contaning correct parameters.
-   */
-  public static void main(String[] args) {
-    // boolean options
-    Option helpOpt = new Option("h", "help", false, "show this help message");
-
-    // log ingest (preprocessing + processing)
-    Option logIngestOpt = new Option("l", LOG_INGEST, false, "begin log ingest");
-    // metadata ingest (preprocessing + processing)
-    Option metaIngestOpt = new Option("m", META_INGEST, false, "begin metadata ingest");
-    // ingest both log and metadata
-    Option fullIngestOpt = new Option("f", FULL_INGEST, false, "begin full ingest Mudrod workflow");
-    // processing only, assuming that preprocessing results is in dataDir
-    Option processingOpt = new Option("p", PROCESSING, false, "begin processing with preprocessing results");
-
-    // argument options
-    Option dataDirOpt = OptionBuilder.hasArg(true).withArgName("/path/to/data/directory").hasArgs(1).withDescription("the data directory to be processed by Mudrod").withLongOpt("dataDirectory")
-        .isRequired().create(DATA_DIR);
-
-    Option esHostOpt = OptionBuilder.hasArg(true).withArgName("host_name").hasArgs(1).withDescription("elasticsearch cluster unicast host").withLongOpt("elasticSearchHost").isRequired(false)
-        .create(ES_HOST);
-
-    Option esTCPPortOpt = OptionBuilder.hasArg(true).withArgName("port_num").hasArgs(1).withDescription("elasticsearch transport TCP port").withLongOpt("elasticSearchTransportTCPPort")
-        .isRequired(false).create(ES_TCP_PORT);
-
-    Option esPortOpt = OptionBuilder.hasArg(true).withArgName("port_num").hasArgs(1).withDescription("elasticsearch HTTP/REST port").withLongOpt("elasticSearchHTTPPort").isRequired(false)
-        .create(ES_HTTP_PORT);
-
-    // create the options
-    Options options = new Options();
-    options.addOption(helpOpt);
-    options.addOption(logIngestOpt);
-    options.addOption(metaIngestOpt);
-    options.addOption(fullIngestOpt);
-    options.addOption(processingOpt);
-    options.addOption(dataDirOpt);
-    options.addOption(esHostOpt);
-    options.addOption(esTCPPortOpt);
-    options.addOption(esPortOpt);
-
-    CommandLineParser parser = new GnuParser();
-    try {
-      CommandLine line = parser.parse(options, args);
-      String processingType = null;
-
-      if (line.hasOption(LOG_INGEST)) {
-        processingType = LOG_INGEST;
-      } else if (line.hasOption(PROCESSING)) {
-        processingType = PROCESSING;
-      } else if (line.hasOption(META_INGEST)) {
-        processingType = META_INGEST;
-      } else if (line.hasOption(FULL_INGEST)) {
-        processingType = FULL_INGEST;
-      }
-
-      String dataDir = line.getOptionValue(DATA_DIR).replace("\\", "/");
-      if (!dataDir.endsWith("/")) {
-        dataDir += "/";
-      }
-
-      MudrodEngine me = new MudrodEngine();
-      me.loadConfig();
-      me.props.put(DATA_DIR, dataDir);
-
-      if (line.hasOption(ES_HOST)) {
-        String esHost = line.getOptionValue(ES_HOST);
-        me.props.put(MudrodConstants.ES_UNICAST_HOSTS, esHost);
-      }
-
-      if (line.hasOption(ES_TCP_PORT)) {
-        String esTcpPort = line.getOptionValue(ES_TCP_PORT);
-        me.props.put(MudrodConstants.ES_TRANSPORT_TCP_PORT, esTcpPort);
-      }
-
-      if (line.hasOption(ES_HTTP_PORT)) {
-        String esHttpPort = line.getOptionValue(ES_HTTP_PORT);
-        me.props.put(MudrodConstants.ES_HTTP_PORT, esHttpPort);
-      }
-
-      me.es = new ESDriver(me.getConfig());
-      me.spark = new SparkDriver(me.getConfig());
-      loadFullConfig(me, dataDir);
-      if (processingType != null) {
-        switch (processingType) {
-        case PROCESSING:
-          me.startProcessing();
-          break;
-        case LOG_INGEST:
-          me.startLogIngest();
-          break;
-        case META_INGEST:
-          me.startMetaIngest();
-          break;
-        case FULL_INGEST:
-          me.startFullIngest();
-          break;
-        default:
-          break;
-        }
-      }
-      me.end();
-    } catch (Exception e) {
-      HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp("MudrodEngine: 'dataDir' argument is mandatory. " + "User must also provide an ingest method.", options, true);
-      LOG.error("Error whilst parsing command line.", e);
-    }
-  }
-
-  private static void loadFullConfig(MudrodEngine me, String dataDir) {
-    //TODO all of the properties defined below, which are determined are
-    //runtime need to be added to MudrodConstants.java and referenced 
-    //accordingly and consistently from Properties.getProperty(MudrodConstant...);
-    me.props.put("ontologyInputDir", dataDir + "SWEET_ocean/");
-    me.props.put("oceanTriples", dataDir + "Ocean_triples.csv");
-    me.props.put("userHistoryMatrix", dataDir + "UserHistoryMatrix.csv");
-    me.props.put("clickstreamMatrix", dataDir + "ClickstreamMatrix.csv");
-    me.props.put("metadataMatrix", dataDir + "MetadataMatrix.csv");
-    me.props.put("clickstreamSVDMatrix_tmp", dataDir + "clickstreamSVDMatrix_tmp.csv");
-    me.props.put("metadataSVDMatrix_tmp", dataDir + "metadataSVDMatrix_tmp.csv");
-    me.props.put("raw_metadataPath", dataDir + me.props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
-
-    me.props.put("jtopia", dataDir + "jtopiaModel");
-    me.props.put("metadata_term_tfidf_matrix", dataDir + "metadata_term_tfidf.csv");
-    me.props.put("metadata_word_tfidf_matrix", dataDir + "metadata_word_tfidf.csv");
-    me.props.put("session_metadata_Matrix", dataDir + "metadata_session_coocurrence_matrix.csv");
-
-    me.props.put("metadataOBCode", dataDir + "MetadataOHCode");
-    me.props.put("metadata_topic", dataDir + "metadata_topic");
-    me.props.put("metadata_topic_matrix", dataDir + "metadata_topic_matrix.csv");
-  }
-
-  /**
-   * Obtain the spark implementation.
-   *
-   * @return the {@link SparkDriver}
-   */
-  public SparkDriver getSparkDriver() {
-    return this.spark;
-  }
-
-  /**
-   * Set the {@link SparkDriver}
-   *
-   * @param sparkDriver
-   *          a configured {@link SparkDriver}
-   */
-  public void setSparkDriver(SparkDriver sparkDriver) {
-    this.spark = sparkDriver;
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java
deleted file mode 100644
index 44d0518..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes Main entry point for Running the Mudrod system.
- */
-package gov.nasa.jpl.mudrod.main;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java
deleted file mode 100644
index 1bb0a3c..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes metadata pre-processing, processing, and data structure
- * classes.
- */
-package gov.nasa.jpl.mudrod.metadata;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java
deleted file mode 100644
index bc7d187..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.metadata.pre;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.utils.HttpRequest;
-import org.apache.commons.io.IOUtils;
-import org.elasticsearch.action.index.IndexRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Properties;
-
-/**
- * ClassName: ApiHarvester Function: Harvest metadata from PO.DAACweb service.
- */
-public class ApiHarvester extends DiscoveryStepAbstract {
-
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = LoggerFactory.getLogger(ApiHarvester.class);
-
-  /**
-   * Creates a new instance of ApiHarvester.
-   *
-   * @param props the Mudrod configuration
-   * @param es    the Elasticsearch drive
-   * @param spark the spark driver
-   */
-  public ApiHarvester(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute() {
-    LOG.info("Starting Metadata harvesting.");
-    startTime = System.currentTimeMillis();
-    //remove old metadata from ES
-    es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
-    //harvest new metadata using PO.DAAC web services
-    harvestMetadatafromWeb();
-    es.createBulkProcessor();
-    addMetadataMapping();
-    importToES();
-    es.destroyBulkProcessor();
-    endTime = System.currentTimeMillis();
-    es.refreshIndex();
-    LOG.info("Metadata harvesting completed. Time elapsed: {}", (endTime - startTime) / 1000);
-    return null;
-  }
-
-  /**
-   * addMetadataMapping: Add mapping to index metadata in Elasticsearch. Please
-   * invoke this method before import metadata to Elasticsearch.
-   */
-  public void addMetadataMapping() {
-    String mappingJson = "{\r\n   \"dynamic_templates\": " + "[\r\n      " + "{\r\n         \"strings\": " + "{\r\n            \"match_mapping_type\": \"string\","
-        + "\r\n            \"mapping\": {\r\n               \"type\": \"text\"," + "\r\n               \"fielddata\": true," + "\r\n               \"analyzer\": \"english\","
-        + "\r\n            \"fields\": {\r\n               \"raw\": {" + "\r\n               \"type\": \"string\"," + "\r\n               \"index\": \"not_analyzed\"" + "\r\n            }"
-        + "\r\n         }\r\n " + "\r\n            }" + "\r\n         }\r\n      }\r\n   ]\r\n}";
-
-    es.getClient().admin().indices().preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType(props.getProperty(MudrodConstants.RAW_METADATA_TYPE)).setSource(mappingJson).execute()
-        .actionGet();
-  }
-
-  /**
-   * importToES: Index metadata into elasticsearch from local file directory.
-   * Please make sure metadata have been harvest from web service before
-   * invoking this method.
-   */
-  private void importToES() {
-    File directory = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH));
-    if(!directory.exists())
-      directory.mkdir();
-    File[] fList = directory.listFiles();
-    for (File file : fList) {
-      InputStream is;
-      try {
-        is = new FileInputStream(file);
-        importSingleFileToES(is);
-      } catch (FileNotFoundException e) {
-        LOG.error("Error finding file!", e);
-      }
-
-    }
-  }
-
-  private void importSingleFileToES(InputStream is) {
-    try {
-      String jsonTxt = IOUtils.toString(is);
-      JsonParser parser = new JsonParser();
-      JsonElement item = parser.parse(jsonTxt);
-      IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE)).source(item.toString());
-      es.getBulkProcessor().add(ir);
-    } catch (IOException e) {
-      LOG.error("Error indexing metadata record!", e);
-    }
-  }
-
-  /**
-   * harvestMetadatafromWeb: Harvest metadata from PO.DAAC web service.
-   */
-  private void harvestMetadatafromWeb() {
-    LOG.info("Metadata download started.");
-    int startIndex = 0;
-    int doc_length = 0;
-    JsonParser parser = new JsonParser();
-    do {
-      String searchAPI = "https://podaac.jpl.nasa.gov/api/dataset?startIndex=" + Integer.toString(startIndex) + "&entries=10&sortField=Dataset-AllTimePopularity&sortOrder=asc&id=&value=&search=";
-      HttpRequest http = new HttpRequest();
-      String response = http.getRequest(searchAPI);
-
-      JsonElement json = parser.parse(response);
-      JsonObject responseObject = json.getAsJsonObject();
-      JsonArray docs = responseObject.getAsJsonObject("response").getAsJsonArray("docs");
-
-      doc_length = docs.size();
-
-      File file = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH));
-      if (!file.exists()) {
-        if (file.mkdir()) {
-          LOG.info("Directory is created!");
-        } else {
-          LOG.error("Failed to create directory!");
-        }
-      }
-      for (int i = 0; i < doc_length; i++) {
-        JsonElement item = docs.get(i);
-        int docId = startIndex + i;
-        File itemfile = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH) + "/" + docId + ".json");
-
-        try (FileWriter fw = new FileWriter(itemfile.getAbsoluteFile()); BufferedWriter bw = new BufferedWriter(fw);) {
-          itemfile.createNewFile();
-          bw.write(item.toString());
-        } catch (IOException e) {
-          LOG.error("Error writing metadata to local file!", e);
-        }
-      }
-
-      startIndex += 10;
-
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        LOG.error("Error entering Elasticsearch Mappings!", e);
-        Thread.currentThread().interrupt();
-      }
-
-    } while (doc_length != 0);
-    
-    LOG.info("Metadata downloading finished");
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java
deleted file mode 100644
index 63677e5..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.metadata.pre;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.metadata.structure.MetadataExtractor;
-import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix;
-import gov.nasa.jpl.mudrod.utils.MatrixUtil;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Generate term-metadata matrix from original metadata. Each row in
- * the matrix is corresponding to a term, and each column is a metadata.
- */
-public class MatrixGenerator extends DiscoveryStepAbstract {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = LoggerFactory.getLogger(MatrixGenerator.class);
-
-  /**
-   * Creates a new instance of MatrixGenerator.
-   *
-   * @param props the Mudrod configuration
-   * @param es    the Elasticsearch drive
-   * @param spark the spark drive
-   */
-  public MatrixGenerator(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  /**
-   * Generate a csv which is a term-metadata matrix genetrated from original
-   * metadata.
-   *
-   * @see DiscoveryStepAbstract#execute()
-   */
-  @Override
-  public Object execute() {
-    LOG.info("Metadata matrix started");
-    startTime = System.currentTimeMillis();
-
-    String metadataMatrixFile = props.getProperty("metadataMatrix");
-    try {
-      MetadataExtractor extractor = new MetadataExtractor();
-      JavaPairRDD<String, List<String>> metadataTermsRDD = extractor.loadMetadata(this.es, this.spark.sc, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
-      LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metadataTermsRDD);
-      MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, metadataMatrixFile);
-
-    } catch (Exception e) {
-      LOG.error("Error during Metadata matrix generaion: {}", e);
-    }
-
-    endTime = System.currentTimeMillis();
-    LOG.info("Metadata matrix finished time elapsed: {}s", (endTime - startTime) / 1000);
-    return null;
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java
deleted file mode 100644
index 60a2429..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes metadata pre-processing functions.
- */
-package gov.nasa.jpl.mudrod.metadata.pre;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java
deleted file mode 100644
index 67dbf2d..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.metadata.process;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.semantics.SVDAnalyzer;
-import gov.nasa.jpl.mudrod.utils.LinkageTriple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * ClassName: MetadataAnalyzer
- * Function: Calculate semantic relationship of vocabularies extracted from
- * metadata.
- */
-public class MetadataAnalyzer extends DiscoveryStepAbstract implements Serializable {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1L;
-  private static final Logger LOG = LoggerFactory.getLogger(MetadataAnalyzer.class);
-
-  /**
-   * Creates a new instance of MetadataAnalyzer.
-   *
-   * @param props the Mudrod configuration
-   * @param es    the Elasticsearch drive
-   * @param spark the spark drive
-   */
-  public MetadataAnalyzer(Properties props, ESDriver es, SparkDriver spark) {
-    super(props, es, spark);
-  }
-
-  @Override
-  public Object execute(Object o) {
-    return null;
-  }
-
-  /**
-   * Calculate semantic relationship of vocabularies from a csv file which is a
-   * term-metadata matrix.
-   *
-   * @see DiscoveryStepAbstract#execute()
-   */
-  @Override
-  public Object execute() {
-    try {
-      LOG.info("*****************Metadata Analyzer starts******************");
-      startTime = System.currentTimeMillis();
-
-      SVDAnalyzer analyzer = new SVDAnalyzer(props, es, spark);
-      int svdDimension = Integer.parseInt(props.getProperty("metadataSVDDimension"));
-      String metadataMatrixFile = props.getProperty("metadataMatrix");
-      String svdMatrixFileName = props.getProperty("metadataSVDMatrix_tmp");
-
-      analyzer.getSVDMatrix(metadataMatrixFile, svdDimension, svdMatrixFileName);
-      List<LinkageTriple> triples = analyzer.calTermSimfromMatrix(svdMatrixFileName);
-
-      analyzer.saveToES(triples, props.getProperty("indexName"), props.getProperty("metadataLinkageType"));
-
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-
-    endTime = System.currentTimeMillis();
-    es.refreshIndex();
-    LOG.info("*****************Metadata Analyzer ends******************Took {}s", (endTime - startTime) / 1000);
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java
deleted file mode 100644
index c988f31..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
- * may not use this file except in compliance with the License. 
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes metadata processing classes.
- */
-package gov.nasa.jpl.mudrod.metadata.process;
\ No newline at end of file