You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/12/19 14:13:17 UTC
[16/17] incubator-sdap-mudrod git commit: SDAP-7 Change all package
namespaces to org.apache.sdap
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
deleted file mode 100644
index d1af04a..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
+++ /dev/null
@@ -1,572 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.driver;
-
-import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-import com.google.gson.GsonBuilder;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.main.MudrodEngine;
-import gov.nasa.jpl.mudrod.utils.ESTransportClient;
-import org.apache.commons.lang.StringUtils;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchType;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.unit.ByteSizeUnit;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.Fuzziness;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.query.MatchAllQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.suggest.Suggest;
-import org.elasticsearch.search.suggest.SuggestBuilder;
-import org.elasticsearch.search.suggest.SuggestBuilders;
-import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-/**
- * Driver implementation for all Elasticsearch functionality.
- */
-public class ESDriver implements Serializable {
-
- private static final Logger LOG = LoggerFactory.getLogger(ESDriver.class);
- private static final long serialVersionUID = 1L;
- private transient Client client = null;
- private transient Node node = null;
- private transient BulkProcessor bulkProcessor = null;
-
- /**
- * Default constructor for this class. To load client configuration call
- * substantiated constructor.
- */
- public ESDriver() {
- // Default constructor, to load configuration call ESDriver(props)
- }
-
- /**
- * Substantiated constructor which accepts a {@link java.util.Properties}
- *
- * @param props a populated properties object.
- */
- public ESDriver(Properties props) {
- try {
- setClient(makeClient(props));
- } catch (IOException e) {
- LOG.error("Error whilst constructing Elastcisearch client.", e);
- }
- }
-
- public void createBulkProcessor() {
- LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", 1000, 2500500);
- setBulkProcessor(BulkProcessor.builder(getClient(), new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long executionId, BulkRequest request) {
- LOG.debug("ESDriver#createBulkProcessor @Override #beforeBulk is not implemented yet!");
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
- LOG.debug("ESDriver#createBulkProcessor @Override #afterBulk is not implemented yet!");
- }
-
- @Override
- public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
- LOG.error("Bulk request has failed!");
- throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure);
- }
- }).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1)
- .build());
- }
-
- public void destroyBulkProcessor() {
- try {
- getBulkProcessor().awaitClose(20, TimeUnit.MINUTES);
- setBulkProcessor(null);
- refreshIndex();
- } catch (InterruptedException e) {
- LOG.error("Error destroying the Bulk Processor.", e);
- }
- }
-
- public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException {
-
- boolean exists = getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists();
- if (exists) {
- return;
- }
-
- getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet();
- getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet();
- }
-
- public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException {
- return this.customAnalyzing(indexName, "cody", str);
- }
-
- public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException {
- String[] strList = str.toLowerCase().split(",");
- for (int i = 0; i < strList.length; i++) {
- String tmp = "";
- AnalyzeResponse r = client.admin().indices().prepareAnalyze(strList[i]).setIndex(indexName).setAnalyzer(analyzer).execute().get();
- for (AnalyzeToken token : r.getTokens()) {
- tmp += token.getTerm() + " ";
- }
- strList[i] = tmp.trim();
- }
- return String.join(",", strList);
- }
-
- public List<String> customAnalyzing(String indexName, List<String> list) throws InterruptedException, ExecutionException {
- if (list == null) {
- return list;
- }
- int size = list.size();
- List<String> customlist = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- customlist.add(this.customAnalyzing(indexName, list.get(i)));
- }
-
- return customlist;
- }
-
- public void deleteAllByQuery(String index, String type, QueryBuilder query) {
- createBulkProcessor();
- SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new TimeValue(60000)).setQuery(query).setSize(10000).execute()
- .actionGet();
-
- while (true) {
- for (SearchHit hit : scrollResp.getHits().getHits()) {
- DeleteRequest deleteRequest = new DeleteRequest(index, type, hit.getId());
- getBulkProcessor().add(deleteRequest);
- }
-
- scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
- if (scrollResp.getHits().getHits().length == 0) {
- break;
- }
-
- }
- destroyBulkProcessor();
- }
-
- public void deleteType(String index, String type) {
- this.deleteAllByQuery(index, type, QueryBuilders.matchAllQuery());
- }
-
- public List<String> getTypeListWithPrefix(Object object, Object object2) {
- ArrayList<String> typeList = new ArrayList<>();
- GetMappingsResponse res;
- try {
- res = getClient().admin().indices().getMappings(new GetMappingsRequest().indices(object.toString())).get();
- ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(object.toString());
- for (ObjectObjectCursor<String, MappingMetaData> c : mapping) {
- if (c.key.startsWith(object2.toString())) {
- typeList.add(c.key);
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Error whilst obtaining type list from Elasticsearch mappings.", e);
- }
- return typeList;
- }
-
- public List<String> getIndexListWithPrefix(Object object) {
-
- LOG.info("Retrieving index list with prefix: {}", object.toString());
- String[] indices = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices();
-
- ArrayList<String> indexList = new ArrayList<>();
- int length = indices.length;
- for (int i = 0; i < length; i++) {
- String indexName = indices[i];
- if (indexName.startsWith(object.toString())) {
- indexList.add(indexName);
- }
- }
-
- return indexList;
- }
-
- public String searchByQuery(String index, String type, String query) throws IOException, InterruptedException, ExecutionException {
- return searchByQuery(index, type, query, false);
- }
-
- @SuppressWarnings("unchecked")
- public String searchByQuery(String index, String type, String query, Boolean bDetail) throws IOException, InterruptedException, ExecutionException {
- boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
- if (!exists) {
- return null;
- }
-
- QueryBuilder qb = QueryBuilders.queryStringQuery(query);
- SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet();
-
- // Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same.
- Map<String, String> fieldsToReturn = new HashMap<>();
-
- fieldsToReturn.put("Dataset-ShortName", "Short Name");
- fieldsToReturn.put("Dataset-LongName", "Long Name");
- fieldsToReturn.put("DatasetParameter-Topic", "Topic");
- fieldsToReturn.put("Dataset-Description", "Dataset-Description");
- fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date");
-
- if (bDetail) {
- fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat");
- fieldsToReturn.put("Dataset-Doi", "Dataset-Doi");
- fieldsToReturn.put("Dataset-ProcessingLevel", "Processing Level");
- fieldsToReturn.put("DatasetCitation-Version", "Version");
- fieldsToReturn.put("DatasetSource-Sensor-ShortName", "DatasetSource-Sensor-ShortName");
- fieldsToReturn.put("DatasetProject-Project-ShortName", "DatasetProject-Project-ShortName");
- fieldsToReturn.put("DatasetParameter-Category", "DatasetParameter-Category");
- fieldsToReturn.put("DatasetLocationPolicy-BasePath", "DatasetLocationPolicy-BasePath");
- fieldsToReturn.put("DatasetParameter-Variable-Full", "DatasetParameter-Variable-Full");
- fieldsToReturn.put("DatasetParameter-Term-Full", "DatasetParameter-Term-Full");
- fieldsToReturn.put("DatasetParameter-VariableDetail", "DatasetParameter-VariableDetail");
-
- fieldsToReturn.put("DatasetRegion-Region", "Region");
- fieldsToReturn.put("DatasetCoverage-NorthLat", "NorthLat");
- fieldsToReturn.put("DatasetCoverage-SouthLat", "SouthLat");
- fieldsToReturn.put("DatasetCoverage-WestLon", "WestLon");
- fieldsToReturn.put("DatasetCoverage-EastLon", "EastLon");
- fieldsToReturn.put("DatasetCoverage-StartTimeLong-Long", "DatasetCoverage-StartTimeLong-Long");
- fieldsToReturn.put("Dataset-DatasetCoverage-StopTimeLong", "Dataset-DatasetCoverage-StopTimeLong");
-
- fieldsToReturn.put("Dataset-TemporalResolution", "Dataset-TemporalResolution");
- fieldsToReturn.put("Dataset-TemporalRepeat", "Dataset-TemporalRepeat");
- fieldsToReturn.put("Dataset-LatitudeResolution", "Dataset-LatitudeResolution");
- fieldsToReturn.put("Dataset-LongitudeResolution", "Dataset-LongitudeResolution");
- fieldsToReturn.put("Dataset-AcrossTrackResolution", "Dataset-AcrossTrackResolution");
- fieldsToReturn.put("Dataset-AlongTrackResolution", "Dataset-AlongTrackResolution");
- }
-
- List<Map<String, Object>> searchResults = new ArrayList<>();
-
- for (SearchHit hit : response.getHits().getHits()) {
- Map<String, Object> source = hit.getSource();
-
- Map<String, Object> searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey()))
- .collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue));
-
- // searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result
-
- // Some results require special handling/formatting:
- // Release Date formatting
- LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList<String>) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate();
- searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE));
-
- if (bDetail) {
-
- // DataFormat value, translate RAW to BINARY
- if ("RAW".equals(searchResult.get("DataFormat"))) {
- searchResult.put("DataFormat", "BINARY");
- }
-
- // DatasetLocationPolicy-BasePath Should only contain ftp, http, or https URLs
- List<String> urls = ((List<String>) searchResult.get("DatasetLocationPolicy-BasePath")).stream().filter(url -> url.startsWith("ftp") || url.startsWith("http")).collect(Collectors.toList());
- searchResult.put("DatasetLocationPolicy-BasePath", urls);
-
- // Time Span Formatting
- LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate();
- LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ?
- null :
- Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate();
- searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE)));
-
- // Temporal resolution can come from one of two fields
- searchResult.put("TemporalResolution", "".equals(searchResult.get("Dataset-TemporalResolution")) ? searchResult.get("Dataset-TemporalRepeat") : searchResult.get("Dataset-TemporalResolution"));
-
- // Special formatting for spatial resolution
- String latResolution = (String) searchResult.get("Dataset-LatitudeResolution");
- String lonResolution = (String) searchResult.get("Dataset-LongitudeResolution");
- if (!latResolution.isEmpty() && !lonResolution.isEmpty()) {
- searchResult.put("SpatialResolution", latResolution + " degrees (latitude) x " + lonResolution + " degrees (longitude)");
- } else {
- String acrossResolution = (String) searchResult.get("Dataset-AcrossTrackResolution");
- String alonResolution = (String) searchResult.get("Dataset-AlongTrackResolution");
- double dAcrossResolution = Double.parseDouble(acrossResolution) / 1000;
- double dAlonResolution = Double.parseDouble(alonResolution) / 1000;
- searchResult.put("SpatialResolution", dAlonResolution + " km (Along) x " + dAcrossResolution + " km (Across)");
- }
-
- // Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies.
- List<List<String>> measurements = buildMeasurementHierarchies((List<String>) searchResult.get("Topic"), (List<String>) searchResult.get("DatasetParameter-Term-Full"),
- (List<String>) searchResult.get("DatasetParameter-Variable-Full"), (List<String>) searchResult.get("DatasetParameter-VariableDetail"));
-
- searchResult.put("Measurements", measurements);
-
- }
-
- searchResults.add(searchResult);
- }
-
- Map<String, List<?>> pdResults = new HashMap<>();
- pdResults.put("PDResults", searchResults);
-
- return new GsonBuilder().create().toJson(pdResults);
- }
-
- /**
- * Builds a List of Measurement Hierarchies given the individual source lists.
- * The hierarchy is built from the element in the same position from each input list in the order: Topic -> Term -> Variable -> VariableDetail
- * "None" and blank strings are ignored. If, at any level, an element does not exist for that position or it is "None" or blank, that hierarchy is considered complete.
- *
- * For example, if the input is:
- * <pre>
- * topics = ["Oceans", "Oceans"]
- * terms = ["Sea Surface Topography", "Ocean Waves"]
- * variables = ["Sea Surface Height", "Significant Wave Height"]
- * variableDetails = ["None", "None"]
- * </pre>
- *
- * The output would be:
- * <pre>
- * [
- * ["Oceans", "Sea Surface Topography", "Sea Surface Height"],
- * ["Oceans", "Ocean Waves", "Significant Wave Height"]
- * ]
- * </pre>
- * Oceans > Sea Surface Topography > Sea Surface Height
- * Oceans > Ocean Waves > Significant Wave Height
- *
- * @param topics List of topics, the first element of a measurement
- * @param terms List of terms, the second element of a measurement
- * @param variables List of variables, the third element of a measurement
- * @param variableDetails List of variable details, the fourth element of a measurement
- *
- * @return A List where each element is a single hierarchy (as a List) built from the provided input lists.
- */
- private List<List<String>> buildMeasurementHierarchies(List<String> topics, List<String> terms, List<String> variables, List<String> variableDetails) {
-
- List<List<String>> measurements = new ArrayList<>();
-
- for (int x = 0; x < topics.size(); x++) {
- measurements.add(new ArrayList<>());
- measurements.get(x).add(topics.get(x));
- // Only add the next 'level' if we can
- if (x < terms.size() && !"None".equalsIgnoreCase(terms.get(x)) && StringUtils.isNotBlank(terms.get(x))) {
- measurements.get(x).add(terms.get(x));
- if (x < variables.size() && !"None".equalsIgnoreCase(variables.get(x)) && StringUtils.isNotBlank(variables.get(x))) {
- measurements.get(x).add(variables.get(x));
- if (x < variableDetails.size() && !"None".equalsIgnoreCase(variableDetails.get(x)) && StringUtils.isNotBlank(variableDetails.get(x))) {
- measurements.get(x).add(variableDetails.get(x));
- }
- }
- }
- }
-
- return measurements;
-
- }
-
- public List<String> autoComplete(String index, String term) {
- boolean exists = this.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
- if (!exists) {
- return new ArrayList<>();
- }
-
- Set<String> suggestHS = new HashSet<String>();
- List<String> suggestList = new ArrayList<>();
-
- // please make sure that the completion field is configured in the ES mapping
- CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100);
- SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder));
- SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet();
-
- Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator();
-
- while (iterator.hasNext()) {
- Suggest.Suggestion.Entry.Option next = iterator.next();
- String suggest = next.getText().string().toLowerCase();
- suggestList.add(suggest);
- }
-
- suggestHS.addAll(suggestList);
- suggestList.clear();
- suggestList.addAll(suggestHS);
- return suggestList;
- }
-
- public void close() {
- client.close();
- }
-
- public void refreshIndex() {
- client.admin().indices().prepareRefresh().execute().actionGet();
- }
-
- /**
- * Generates a TransportClient or NodeClient
- *
- * @param props a populated {@link java.util.Properties} object
- * @return a constructed {@link org.elasticsearch.client.Client}
- * @throws IOException if there is an error building the
- * {@link org.elasticsearch.client.Client}
- */
- protected Client makeClient(Properties props) throws IOException {
- String clusterName = props.getProperty(MudrodConstants.ES_CLUSTER);
- String hostsString = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
- String[] hosts = hostsString.split(",");
- String portStr = props.getProperty(MudrodConstants.ES_TRANSPORT_TCP_PORT);
- int port = Integer.parseInt(portStr);
-
- Settings.Builder settingsBuilder = Settings.builder();
-
- // Set the cluster name and build the settings
- if (!clusterName.isEmpty())
- settingsBuilder.put("cluster.name", clusterName);
-
- settingsBuilder.put("http.type", "netty3");
- settingsBuilder.put("transport.type", "netty3");
-
- Settings settings = settingsBuilder.build();
-
- Client client = null;
-
- // Prefer TransportClient
- if (hosts != null && port > 1) {
- TransportClient transportClient = new ESTransportClient(settings);
- for (String host : hosts)
- transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
- client = transportClient;
- } else if (clusterName != null) {
- node = new Node(settings);
- client = node.client();
- }
-
- return client;
- }
-
- /**
- * Main method used to invoke the ESDriver implementation.
- *
- * @param args no arguments are required to invoke the Driver.
- */
- public static void main(String[] args) {
- MudrodEngine mudrodEngine = new MudrodEngine();
- ESDriver es = new ESDriver(mudrodEngine.loadConfig());
- es.getTypeListWithPrefix("podaacsession", "sessionstats");
- }
-
- /**
- * @return the client
- */
- public Client getClient() {
- return client;
- }
-
- /**
- * @param client the client to set
- */
- public void setClient(Client client) {
- this.client = client;
- }
-
- /**
- * @return the bulkProcessor
- */
- public BulkProcessor getBulkProcessor() {
- return bulkProcessor;
- }
-
- /**
- * @param bulkProcessor the bulkProcessor to set
- */
- public void setBulkProcessor(BulkProcessor bulkProcessor) {
- this.bulkProcessor = bulkProcessor;
- }
-
- public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) {
-
- UpdateRequest ur = null;
- try {
- ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject());
- } catch (IOException e) {
- LOG.error("Error whilst attempting to generate a new Update Request.", e);
- }
-
- return ur;
- }
-
- public UpdateRequest generateUpdateRequest(String index, String type, String id, Map<String, Object> filedValueMap) {
-
- UpdateRequest ur = null;
- try {
- XContentBuilder builder = jsonBuilder().startObject();
- for (Entry<String, Object> entry : filedValueMap.entrySet()) {
- String key = entry.getKey();
- builder.field(key, filedValueMap.get(key));
- }
- builder.endObject();
- ur = new UpdateRequest(index, type, id).doc(builder);
- } catch (IOException e) {
- LOG.error("Error whilst attempting to generate a new Update Request.", e);
- }
-
- return ur;
- }
-
- public int getDocCount(String index, String... type) {
- MatchAllQueryBuilder search = QueryBuilders.matchAllQuery();
- String[] indexArr = new String[] { index };
- return this.getDocCount(indexArr, type, search);
- }
-
- public int getDocCount(String[] index, String[] type) {
- MatchAllQueryBuilder search = QueryBuilders.matchAllQuery();
- return this.getDocCount(index, type, search);
- }
-
- public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch) {
- SearchRequestBuilder countSrBuilder = getClient().prepareSearch(index).setTypes(type).setQuery(filterSearch).setSize(0);
- SearchResponse countSr = countSrBuilder.execute().actionGet();
- int docCount = (int) countSr.getHits().getTotalHits();
- return docCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
deleted file mode 100644
index e49c029..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.driver;
-
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.serializer.KryoSerializer;
-import org.apache.spark.sql.SQLContext;
-
-import java.io.File;
-import java.io.Serializable;
-import java.net.URISyntaxException;
-import java.util.Properties;
-//import org.apache.spark.sql.SparkSession;
-
-public class SparkDriver implements Serializable {
-
- //TODO the commented out code below is the API uprgade
- //for Spark 2.0.0. It requires a large upgrade and simplification
- //across the mudrod codebase so should be done in an individual ticket.
- // /**
- // *
- // */
- // private static final long serialVersionUID = 1L;
- // private SparkSession builder;
- //
- // public SparkDriver() {
- // builder = SparkSession.builder()
- // .master("local[2]")
- // .config("spark.hadoop.validateOutputSpecs", "false")
- // .config("spark.files.overwrite", "true")
- // .getOrCreate();
- // }
- //
- // public SparkSession getBuilder() {
- // return builder;
- // }
- //
- // public void setBuilder(SparkSession builder) {
- // this.builder = builder;
- // }
- //
- // public void close() {
- // builder.stop();
- // }
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- public transient JavaSparkContext sc;
- public transient SQLContext sqlContext;
-
- public SparkDriver() {
- // empty default constructor
- }
-
- public SparkDriver(Properties props) {
- SparkConf conf = new SparkConf().setAppName(props.getProperty(MudrodConstants.SPARK_APP_NAME, "MudrodSparkApp")).setIfMissing("spark.master", props.getProperty(MudrodConstants.SPARK_MASTER))
- .set("spark.hadoop.validateOutputSpecs", "false").set("spark.files.overwrite", "true");
-
- String esHost = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
- String esPort = props.getProperty(MudrodConstants.ES_HTTP_PORT);
-
- if (!"".equals(esHost)) {
- conf.set("es.nodes", esHost);
- }
-
- if (!"".equals(esPort)) {
- conf.set("es.port", esPort);
- }
-
- conf.set("spark.serializer", KryoSerializer.class.getName());
- conf.set("es.batch.size.entries", "1500");
-
- sc = new JavaSparkContext(conf);
- sqlContext = new SQLContext(sc);
- }
-
- public void close() {
- sc.sc().stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
deleted file mode 100644
index d9cde36..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes commonly used Elasticsearch and Spark related
- * functions
- */
-package gov.nasa.jpl.mudrod.driver;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
deleted file mode 100644
index 14b667d..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.integration;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.sort.SortOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.text.DecimalFormat;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-/**
- * Supports ability to integrate vocab similarity results from metadata, ontology, and web logs.
- */
-public class LinkageIntegration extends DiscoveryStepAbstract {
-
- private static final Logger LOG = LoggerFactory.getLogger(LinkageIntegration.class);
- private static final long serialVersionUID = 1L;
- transient List<LinkedTerm> termList = new ArrayList<>();
- DecimalFormat df = new DecimalFormat("#.00");
- private static final String INDEX_NAME = "indexName";
- private static final String WEIGHT = "weight";
-
- public LinkageIntegration(Properties props, ESDriver es, SparkDriver spark) {
- super(props, es, spark);
- }
-
- /**
- * The data structure to store semantic triple.
- */
- class LinkedTerm {
- String term = null;
- double weight = 0;
- String model = null;
-
- public LinkedTerm(String str, double w, String m) {
- term = str;
- weight = w;
- model = m;
- }
- }
-
- /**
- * Method of executing integration step
- */
- @Override
- public Object execute() {
- getIngeratedList("ocean wind", 11);
- return null;
- }
-
- @Override
- public Object execute(Object o) {
- return null;
- }
-
- /**
- * Method of getting integrated results
- *
- * @param input query string
- * @return a hash map where the string is a related term, and double is the
- * similarity to the input query
- */
- public Map<String, Double> appyMajorRule(String input) {
- termList = new ArrayList<>();
- Map<String, Double> termsMap = new HashMap<>();
- Map<String, List<LinkedTerm>> map = new HashMap<>();
- try {
- map = aggregateRelatedTermsFromAllmodel(es.customAnalyzing(props.getProperty(INDEX_NAME), input));
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Error applying majority rule", e);
- }
-
- for (Entry<String, List<LinkedTerm>> entry : map.entrySet()) {
- List<LinkedTerm> list = entry.getValue();
- double sumModelWeight = 0;
- double tmp = 0;
- for (LinkedTerm element : list) {
- sumModelWeight += getModelweight(element.model);
-
- if (element.weight > tmp) {
- tmp = element.weight;
- }
- }
-
- double finalWeight = tmp + ((sumModelWeight - 2) * 0.05);
- if (finalWeight < 0) {
- finalWeight = 0;
- }
-
- if (finalWeight > 1) {
- finalWeight = 1;
- }
- termsMap.put(entry.getKey(), Double.parseDouble(df.format(finalWeight)));
- }
-
- return sortMapByValue(termsMap);
- }
-
- /**
- * Method of getting integrated results
- *
- * @param input query string
- * @param num the number of most related terms
- * @return a string of related terms along with corresponding similarities
- */
- public String getIngeratedList(String input, int num) {
- String output = "";
- Map<String, Double> sortedMap = appyMajorRule(input);
- int count = 0;
- for (Entry<String, Double> entry : sortedMap.entrySet()) {
- if (count < num) {
- output += entry.getKey() + " = " + entry.getValue() + ", ";
- }
- count++;
- }
- LOG.info("\n************************Integrated results***************************");
- LOG.info(output);
- return output;
- }
-
- /**
- * Method of getting integrated results
- *
- * @param input query string
- * @return a JSON object of related terms along with corresponding similarities
- */
- public JsonObject getIngeratedListInJson(String input) {
- Map<String, Double> sortedMap = appyMajorRule(input);
- int count = 0;
- Map<String, Double> trimmedMap = new LinkedHashMap<>();
- for (Entry<String, Double> entry : sortedMap.entrySet()) {
- if (!entry.getKey().contains("china")) {
- if (count < 10) {
- trimmedMap.put(entry.getKey(), entry.getValue());
- }
- count++;
- }
- }
-
- return mapToJson(trimmedMap);
- }
-
- /**
- * Method of aggregating terms from web logs, metadata, and ontology
- *
- * @param input query string
- * @return a hash map where the string is a related term, and the list is
- * the similarities from different sources
- */
- public Map<String, List<LinkedTerm>> aggregateRelatedTermsFromAllmodel(String input) {
- aggregateRelatedTerms(input, props.getProperty("userHistoryLinkageType"));
- aggregateRelatedTerms(input, props.getProperty("clickStreamLinkageType"));
- aggregateRelatedTerms(input, props.getProperty("metadataLinkageType"));
- aggregateRelatedTermsSWEET(input, props.getProperty("ontologyLinkageType"));
-
- return termList.stream().collect(Collectors.groupingBy(w -> w.term));
- }
-
- public int getModelweight(String model) {
- if (model.equals(props.getProperty("userHistoryLinkageType"))) {
- return Integer.parseInt(props.getProperty("userHistory_w"));
- }
-
- if (model.equals(props.getProperty("clickStreamLinkageType"))) {
- return Integer.parseInt(props.getProperty("clickStream_w"));
- }
-
- if (model.equals(props.getProperty("metadataLinkageType"))) {
- return Integer.parseInt(props.getProperty("metadata_w"));
- }
-
- if (model.equals(props.getProperty("ontologyLinkageType"))) {
- return Integer.parseInt(props.getProperty("ontology_w"));
- }
-
- return 999999;
- }
-
- /**
- * Method of extracting the related term from a comma string
- *
- * @param str input string
- * @param input query string
- * @return related term contained in the input string
- */
- public String extractRelated(String str, String input) {
- String[] strList = str.split(",");
- if (input.equals(strList[0])) {
- return strList[1];
- } else {
- return strList[0];
- }
- }
-
- public void aggregateRelatedTerms(String input, String model) {
- //get the first 10 related terms
- SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("keywords", input)).addSort(WEIGHT, SortOrder.DESC).setSize(11)
- .execute().actionGet();
-
- LOG.info("\n************************ {} results***************************", model);
- for (SearchHit hit : usrhis.getHits().getHits()) {
- Map<String, Object> result = hit.getSource();
- String keywords = (String) result.get("keywords");
- String relatedKey = extractRelated(keywords, input);
-
- if (!relatedKey.equals(input)) {
- LinkedTerm lTerm = new LinkedTerm(relatedKey, (double) result.get(WEIGHT), model);
- LOG.info("( {} {} )", relatedKey, (double) result.get(WEIGHT));
- termList.add(lTerm);
- }
-
- }
- }
-
- /**
- * Method of querying related terms from ontology
- *
- * @param input input query
- * @param model source name
- */
- public void aggregateRelatedTermsSWEET(String input, String model) {
- SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC)
- .setSize(11).execute().actionGet();
- LOG.info("\n************************ {} results***************************", model);
- for (SearchHit hit : usrhis.getHits().getHits()) {
- Map<String, Object> result = hit.getSource();
- String conceptB = (String) result.get("concept_B");
- if (!conceptB.equals(input)) {
- LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), model);
- LOG.info("( {} {} )", conceptB, (double) result.get(WEIGHT));
- termList.add(lTerm);
- }
- }
- }
-
- /**
- * Method of sorting a map by value
- *
- * @param passedMap input map
- * @return sorted map
- */
- public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) {
- List<String> mapKeys = new ArrayList<>(passedMap.keySet());
- List<Double> mapValues = new ArrayList<>(passedMap.values());
- Collections.sort(mapValues, Collections.reverseOrder());
- Collections.sort(mapKeys, Collections.reverseOrder());
-
- LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>();
-
- Iterator<Double> valueIt = mapValues.iterator();
- while (valueIt.hasNext()) {
- Object val = valueIt.next();
- Iterator<String> keyIt = mapKeys.iterator();
-
- while (keyIt.hasNext()) {
- Object key = keyIt.next();
- String comp1 = passedMap.get(key).toString();
- String comp2 = val.toString();
-
- if (comp1.equals(comp2)) {
- passedMap.remove(key);
- mapKeys.remove(key);
- sortedMap.put((String) key, (Double) val);
- break;
- }
-
- }
-
- }
- return sortedMap;
- }
-
- /**
- * Method of converting hashmap to JSON
- *
- * @param word input query
- * @param wordweights a map from related terms to weights
- * @return converted JSON object
- */
- private JsonObject mapToJson(Map<String, Double> wordweights) {
- Gson gson = new Gson();
- JsonObject json = new JsonObject();
- List<JsonObject> nodes = new ArrayList<>();
-
- for (Entry<String, Double> entry : wordweights.entrySet()) {
- JsonObject node = new JsonObject();
- String key = entry.getKey();
- Double value = entry.getValue();
- node.addProperty("word", key);
- node.addProperty("weight", value);
- nodes.add(node);
- }
-
- JsonElement nodesElement = gson.toJsonTree(nodes);
- json.add("ontology", nodesElement);
-
- return json;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
deleted file mode 100644
index 7f2ba66..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes integration method of web log, ontology, and metdata
- * mining results.
- */
-package gov.nasa.jpl.mudrod.integration;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java
deleted file mode 100644
index ee7cbfa..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodConstants.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.main;
-
-import gov.nasa.jpl.mudrod.ontology.Ontology;
-
-/**
- * Class contains static constant keys and values relating to Mudrod
- * configuration properties. Property values are read from <a href=
- * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>
- */
-public interface MudrodConstants {
-
- public static final String CLEANUP_TYPE_PREFIX = "Cleanup_type_prefix";
-
- public static final String CLICK_STREAM_LINKAGE_TYPE = "clickStreamLinkageType";
-
- public static final String CLICK_STREAM_MATRIX_TYPE = "clickStreamMatrixType";
-
- public static final String CLICKSTREAM_SVD_DIM = "clickstreamSVDDimension";
-
- public static final String CLICKSTREAM_W = "clickStream_w";
-
- public static final String COMMENT_TYPE = "commentType";
-
- /** Defined on CLI */
- public static final String DATA_DIR = "dataDir";
-
- public static final String DOWNLOAD_F = "downloadf";
-
- public static final String DOWNLOAD_WEIGHT = "downloadWeight";
-
- public static final String ES_CLUSTER = "clusterName";
-
- public static final String ES_TRANSPORT_TCP_PORT = "ES_Transport_TCP_Port";
-
- public static final String ES_UNICAST_HOSTS = "ES_unicast_hosts";
-
- public static final String ES_HTTP_PORT = "ES_HTTP_port";
-
- public static final String ES_INDEX_NAME = "indexName";
-
- public static final String FTP_PREFIX = "ftpPrefix";
-
- public static final String FTP_TYPE_PREFIX = "FTP_type_prefix";
-
- public static final String HTTP_PREFIX = "httpPrefix";
-
- public static final String HTTP_TYPE_PREFIX = "HTTP_type_prefix";
-
- public static final String LOG_INDEX = "logIndexName";
-
- public static final String METADATA_LINKAGE_TYPE = "metadataLinkageType";
-
- public static final String METADATA_SVD_DIM = "metadataSVDDimension";
-
- public static final String METADATA_URL = "metadataurl";
-
- public static final String METADATA_W = "metadata_w";
-
- public static final String MINI_USER_HISTORY = "mini_userHistory";
-
- public static final String MUDROD = "mudrod";
-
- /** Defined on CLI */
- public static final String MUDROD_CONFIG = "MUDROD_CONFIG";
- /**
- * An {@link Ontology} implementation.
- */
- public static final String ONTOLOGY_IMPL = MUDROD + "ontology.implementation";
-
- public static final String ONTOLOGY_LINKAGE_TYPE = "ontologyLinkageType";
-
- public static final String ONTOLOGY_W = "ontology_w";
-
- public static final String PROCESS_TYPE = "processingType";
-
- /** Defined on CLI */
- public static final String RAW_METADATA_PATH = "raw_metadataPath";
-
- public static final String RAW_METADATA_TYPE = "raw_metadataType";
-
- public static final String SEARCH_F = "searchf";
-
- public static final String SENDING_RATE = "sendingrate";
-
- public static final String SESSION_PORT = "SessionPort";
-
- public static final String SESSION_STATS_PREFIX = "SessionStats_prefix";
-
- public static final String SESSION_URL = "SessionUrl";
-
- public static final String SPARK_APP_NAME = "spark.app.name";
-
- public static final String SPARK_MASTER = "spark.master";
- /**
- * Absolute local location of javaSVMWithSGDModel directory. This is typically
- * <code>file:///usr/local/mudrod/core/src/main/resources/javaSVMWithSGDModel</code>
- */
- public static final String SVM_SGD_MODEL = "svmSgdModel";
-
- public static final String TIMEGAP = "timegap";
-
- public static final String TIME_SUFFIX = "TimeSuffix";
-
- public static final String USE_HISTORY_LINKAGE_TYPE = "userHistoryLinkageType";
-
- public static final String USER_HISTORY_W = "userHistory_w";
-
- public static final String VIEW_F = "viewf";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java
deleted file mode 100644
index 27dba84..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/main/MudrodEngine.java
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.main;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryEngineAbstract;
-import gov.nasa.jpl.mudrod.discoveryengine.MetadataDiscoveryEngine;
-import gov.nasa.jpl.mudrod.discoveryengine.OntologyDiscoveryEngine;
-import gov.nasa.jpl.mudrod.discoveryengine.RecommendEngine;
-import gov.nasa.jpl.mudrod.discoveryengine.WeblogDiscoveryEngine;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.integration.LinkageIntegration;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.jdom2.Document;
-import org.jdom2.Element;
-import org.jdom2.JDOMException;
-import org.jdom2.input.SAXBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Properties;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import static gov.nasa.jpl.mudrod.main.MudrodConstants.DATA_DIR;
-
-/**
- * Main entry point for Running the Mudrod system. Invocation of this class is
- * tightly linked to the primary Mudrod configuration which can be located at
- * <a href=
- * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>.
- */
-public class MudrodEngine {
-
- private static final Logger LOG = LoggerFactory.getLogger(MudrodEngine.class);
- private Properties props = new Properties();
- private ESDriver es = null;
- private SparkDriver spark = null;
- private static final String LOG_INGEST = "logIngest";
- private static final String META_INGEST = "metaIngest";
- private static final String FULL_INGEST = "fullIngest";
- private static final String PROCESSING = "processingWithPreResults";
- private static final String ES_HOST = "esHost";
- private static final String ES_TCP_PORT = "esTCPPort";
- private static final String ES_HTTP_PORT = "esPort";
-
- /**
- * Public constructor for this class.
- */
- public MudrodEngine() {
- // default constructor
- }
-
- /**
- * Start the {@link ESDriver}. Should only be called after call to
- * {@link MudrodEngine#loadConfig()}
- *
- * @return fully provisioned {@link ESDriver}
- */
- public ESDriver startESDriver() {
- return new ESDriver(props);
- }
-
- /**
- * Start the {@link SparkDriver}. Should only be called after call to
- * {@link MudrodEngine#loadConfig()}
- *
- * @return fully provisioned {@link SparkDriver}
- */
- public SparkDriver startSparkDriver() {
- return new SparkDriver(props);
- }
-
- /**
- * Retreive the Mudrod configuration as a Properties Map containing K, V of
- * type String.
- *
- * @return a {@link java.util.Properties} object
- */
- public Properties getConfig() {
- return props;
- }
-
- /**
- * Retreive the Mudrod {@link ESDriver}
- *
- * @return the {@link ESDriver} instance.
- */
- public ESDriver getESDriver() {
- return this.es;
- }
-
- /**
- * Set the Elasticsearch driver for MUDROD
- *
- * @param es
- * an ES driver instance
- */
- public void setESDriver(ESDriver es) {
- this.es = es;
- }
-
- private InputStream locateConfig() {
-
- String configLocation = System.getenv(MudrodConstants.MUDROD_CONFIG) == null ? "" : System.getenv(MudrodConstants.MUDROD_CONFIG);
- File configFile = new File(configLocation);
-
- try {
- InputStream configStream = new FileInputStream(configFile);
- LOG.info("Loaded config file from " + configFile.getAbsolutePath());
- return configStream;
- } catch (IOException e) {
- LOG.info("File specified by environment variable " + MudrodConstants.MUDROD_CONFIG + "=\'" + configLocation + "\' could not be loaded. " + e.getMessage());
- }
-
- InputStream configStream = MudrodEngine.class.getClassLoader().getResourceAsStream("config.xml");
-
- if (configStream != null) {
- LOG.info("Loaded config file from {}", MudrodEngine.class.getClassLoader().getResource("config.xml").getPath());
- }
-
- return configStream;
- }
-
- /**
- * Load the configuration provided at <a href=
- * "https://github.com/mudrod/mudrod/blob/master/core/src/main/resources/config.xml">config.xml</a>.
- *
- * @return a populated {@link java.util.Properties} object.
- */
- public Properties loadConfig() {
- SAXBuilder saxBuilder = new SAXBuilder();
-
- InputStream configStream = locateConfig();
-
- Document document;
- try {
- document = saxBuilder.build(configStream);
- Element rootNode = document.getRootElement();
- List<Element> paraList = rootNode.getChildren("para");
-
- for (int i = 0; i < paraList.size(); i++) {
- Element paraNode = paraList.get(i);
- String attributeName = paraNode.getAttributeValue("name");
- if (MudrodConstants.SVM_SGD_MODEL.equals(attributeName)) {
- props.put(attributeName, decompressSVMWithSGDModel(paraNode.getTextTrim()));
- } else {
- props.put(attributeName, paraNode.getTextTrim());
- }
- }
- } catch (JDOMException | IOException e) {
- LOG.error("Exception whilst retrieving or processing XML contained within 'config.xml'!", e);
- }
- return getConfig();
-
- }
-
- private String decompressSVMWithSGDModel(String archiveName) throws IOException {
-
- URL scmArchive = getClass().getClassLoader().getResource(archiveName);
- if (scmArchive == null) {
- throw new IOException("Unable to locate " + archiveName + " as a classpath resource.");
- }
- File tempDir = Files.createTempDirectory("mudrod").toFile();
- assert tempDir.setWritable(true);
- File archiveFile = new File(tempDir, archiveName);
- FileUtils.copyURLToFile(scmArchive, archiveFile);
-
- // Decompress archive
- int BUFFER_SIZE = 512000;
- ZipInputStream zipIn = new ZipInputStream(new FileInputStream(archiveFile));
- ZipEntry entry;
- while ((entry = zipIn.getNextEntry()) != null) {
- File f = new File(tempDir, entry.getName());
- // If the entry is a directory, create the directory.
- if (entry.isDirectory() && !f.exists()) {
- boolean created = f.mkdirs();
- if (!created) {
- LOG.error("Unable to create directory '{}', during extraction of archive contents.", f.getAbsolutePath());
- }
- } else if (!entry.isDirectory()) {
- boolean created = f.getParentFile().mkdirs();
- if (!created && !f.getParentFile().exists()) {
- LOG.error("Unable to create directory '{}', during extraction of archive contents.", f.getParentFile().getAbsolutePath());
- }
- int count;
- byte data[] = new byte[BUFFER_SIZE];
- FileOutputStream fos = new FileOutputStream(new File(tempDir, entry.getName()), false);
- try (BufferedOutputStream dest = new BufferedOutputStream(fos, BUFFER_SIZE)) {
- while ((count = zipIn.read(data, 0, BUFFER_SIZE)) != -1) {
- dest.write(data, 0, count);
- }
- }
- }
- }
-
- return new File(tempDir, StringUtils.removeEnd(archiveName, ".zip")).toURI().toString();
- }
-
- /**
- * Preprocess and process logs {@link DiscoveryEngineAbstract} implementations
- * for weblog
- */
- public void startLogIngest() {
- DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark);
- wd.preprocess();
- wd.process();
- LOG.info("*****************logs have been ingested successfully******************");
- }
-
- /**
- * updating and analysing metadata to metadata similarity results
- */
- public void startMetaIngest() {
- DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark);
- md.preprocess();
- md.process();
-
- DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark);
- recom.preprocess();
- recom.process();
- LOG.info("Metadata has been ingested successfully.");
- }
-
- public void startFullIngest() {
- DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark);
- wd.preprocess();
- wd.process();
-
- DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark);
- md.preprocess();
- md.process();
-
- DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark);
- recom.preprocess();
- recom.process();
- LOG.info("Full ingest has finished successfully.");
- }
-
- /**
- * Only preprocess various {@link DiscoveryEngineAbstract} implementations for
- * weblog, ontology and metadata, linkage discovery and integration.
- */
- public void startProcessing() {
- DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark);
- wd.process();
-
- DiscoveryEngineAbstract od = new OntologyDiscoveryEngine(props, es, spark);
- od.preprocess();
- od.process();
-
- DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark);
- md.preprocess();
- md.process();
-
- LinkageIntegration li = new LinkageIntegration(props, es, spark);
- li.execute();
-
- DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark);
- recom.process();
- }
-
- /**
- * Close the connection to the {@link ESDriver} instance.
- */
- public void end() {
- if (es != null) {
- es.close();
- }
- }
-
- /**
- * Main program invocation. Accepts one argument denoting location (on disk)
- * to a log file which is to be ingested. Help will be provided if invoked
- * with incorrect parameters.
- *
- * @param args
- * {@link java.lang.String} array contaning correct parameters.
- */
- public static void main(String[] args) {
- // boolean options
- Option helpOpt = new Option("h", "help", false, "show this help message");
-
- // log ingest (preprocessing + processing)
- Option logIngestOpt = new Option("l", LOG_INGEST, false, "begin log ingest");
- // metadata ingest (preprocessing + processing)
- Option metaIngestOpt = new Option("m", META_INGEST, false, "begin metadata ingest");
- // ingest both log and metadata
- Option fullIngestOpt = new Option("f", FULL_INGEST, false, "begin full ingest Mudrod workflow");
- // processing only, assuming that preprocessing results is in dataDir
- Option processingOpt = new Option("p", PROCESSING, false, "begin processing with preprocessing results");
-
- // argument options
- Option dataDirOpt = OptionBuilder.hasArg(true).withArgName("/path/to/data/directory").hasArgs(1).withDescription("the data directory to be processed by Mudrod").withLongOpt("dataDirectory")
- .isRequired().create(DATA_DIR);
-
- Option esHostOpt = OptionBuilder.hasArg(true).withArgName("host_name").hasArgs(1).withDescription("elasticsearch cluster unicast host").withLongOpt("elasticSearchHost").isRequired(false)
- .create(ES_HOST);
-
- Option esTCPPortOpt = OptionBuilder.hasArg(true).withArgName("port_num").hasArgs(1).withDescription("elasticsearch transport TCP port").withLongOpt("elasticSearchTransportTCPPort")
- .isRequired(false).create(ES_TCP_PORT);
-
- Option esPortOpt = OptionBuilder.hasArg(true).withArgName("port_num").hasArgs(1).withDescription("elasticsearch HTTP/REST port").withLongOpt("elasticSearchHTTPPort").isRequired(false)
- .create(ES_HTTP_PORT);
-
- // create the options
- Options options = new Options();
- options.addOption(helpOpt);
- options.addOption(logIngestOpt);
- options.addOption(metaIngestOpt);
- options.addOption(fullIngestOpt);
- options.addOption(processingOpt);
- options.addOption(dataDirOpt);
- options.addOption(esHostOpt);
- options.addOption(esTCPPortOpt);
- options.addOption(esPortOpt);
-
- CommandLineParser parser = new GnuParser();
- try {
- CommandLine line = parser.parse(options, args);
- String processingType = null;
-
- if (line.hasOption(LOG_INGEST)) {
- processingType = LOG_INGEST;
- } else if (line.hasOption(PROCESSING)) {
- processingType = PROCESSING;
- } else if (line.hasOption(META_INGEST)) {
- processingType = META_INGEST;
- } else if (line.hasOption(FULL_INGEST)) {
- processingType = FULL_INGEST;
- }
-
- String dataDir = line.getOptionValue(DATA_DIR).replace("\\", "/");
- if (!dataDir.endsWith("/")) {
- dataDir += "/";
- }
-
- MudrodEngine me = new MudrodEngine();
- me.loadConfig();
- me.props.put(DATA_DIR, dataDir);
-
- if (line.hasOption(ES_HOST)) {
- String esHost = line.getOptionValue(ES_HOST);
- me.props.put(MudrodConstants.ES_UNICAST_HOSTS, esHost);
- }
-
- if (line.hasOption(ES_TCP_PORT)) {
- String esTcpPort = line.getOptionValue(ES_TCP_PORT);
- me.props.put(MudrodConstants.ES_TRANSPORT_TCP_PORT, esTcpPort);
- }
-
- if (line.hasOption(ES_HTTP_PORT)) {
- String esHttpPort = line.getOptionValue(ES_HTTP_PORT);
- me.props.put(MudrodConstants.ES_HTTP_PORT, esHttpPort);
- }
-
- me.es = new ESDriver(me.getConfig());
- me.spark = new SparkDriver(me.getConfig());
- loadFullConfig(me, dataDir);
- if (processingType != null) {
- switch (processingType) {
- case PROCESSING:
- me.startProcessing();
- break;
- case LOG_INGEST:
- me.startLogIngest();
- break;
- case META_INGEST:
- me.startMetaIngest();
- break;
- case FULL_INGEST:
- me.startFullIngest();
- break;
- default:
- break;
- }
- }
- me.end();
- } catch (Exception e) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("MudrodEngine: 'dataDir' argument is mandatory. " + "User must also provide an ingest method.", options, true);
- LOG.error("Error whilst parsing command line.", e);
- }
- }
-
- private static void loadFullConfig(MudrodEngine me, String dataDir) {
- //TODO all of the properties defined below, which are determined are
- //runtime need to be added to MudrodConstants.java and referenced
- //accordingly and consistently from Properties.getProperty(MudrodConstant...);
- me.props.put("ontologyInputDir", dataDir + "SWEET_ocean/");
- me.props.put("oceanTriples", dataDir + "Ocean_triples.csv");
- me.props.put("userHistoryMatrix", dataDir + "UserHistoryMatrix.csv");
- me.props.put("clickstreamMatrix", dataDir + "ClickstreamMatrix.csv");
- me.props.put("metadataMatrix", dataDir + "MetadataMatrix.csv");
- me.props.put("clickstreamSVDMatrix_tmp", dataDir + "clickstreamSVDMatrix_tmp.csv");
- me.props.put("metadataSVDMatrix_tmp", dataDir + "metadataSVDMatrix_tmp.csv");
- me.props.put("raw_metadataPath", dataDir + me.props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
-
- me.props.put("jtopia", dataDir + "jtopiaModel");
- me.props.put("metadata_term_tfidf_matrix", dataDir + "metadata_term_tfidf.csv");
- me.props.put("metadata_word_tfidf_matrix", dataDir + "metadata_word_tfidf.csv");
- me.props.put("session_metadata_Matrix", dataDir + "metadata_session_coocurrence_matrix.csv");
-
- me.props.put("metadataOBCode", dataDir + "MetadataOHCode");
- me.props.put("metadata_topic", dataDir + "metadata_topic");
- me.props.put("metadata_topic_matrix", dataDir + "metadata_topic_matrix.csv");
- }
-
- /**
- * Obtain the spark implementation.
- *
- * @return the {@link SparkDriver}
- */
- public SparkDriver getSparkDriver() {
- return this.spark;
- }
-
- /**
- * Set the {@link SparkDriver}
- *
- * @param sparkDriver
- * a configured {@link SparkDriver}
- */
- public void setSparkDriver(SparkDriver sparkDriver) {
- this.spark = sparkDriver;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java
deleted file mode 100644
index 44d0518..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/main/package-info.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes Main entry point for Running the Mudrod system.
- */
-package gov.nasa.jpl.mudrod.main;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java
deleted file mode 100644
index 1bb0a3c..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes metadata pre-processing, processing, and data structure
- * classes.
- */
-package gov.nasa.jpl.mudrod.metadata;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java
deleted file mode 100644
index bc7d187..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/ApiHarvester.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.metadata.pre;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.utils.HttpRequest;
-import org.apache.commons.io.IOUtils;
-import org.elasticsearch.action.index.IndexRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Properties;
-
-/**
- * ClassName: ApiHarvester Function: Harvest metadata from PO.DAACweb service.
- */
-public class ApiHarvester extends DiscoveryStepAbstract {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(ApiHarvester.class);
-
- /**
- * Creates a new instance of ApiHarvester.
- *
- * @param props the Mudrod configuration
- * @param es the Elasticsearch drive
- * @param spark the spark driver
- */
- public ApiHarvester(Properties props, ESDriver es, SparkDriver spark) {
- super(props, es, spark);
- }
-
- @Override
- public Object execute() {
- LOG.info("Starting Metadata harvesting.");
- startTime = System.currentTimeMillis();
- //remove old metadata from ES
- es.deleteType(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
- //harvest new metadata using PO.DAAC web services
- harvestMetadatafromWeb();
- es.createBulkProcessor();
- addMetadataMapping();
- importToES();
- es.destroyBulkProcessor();
- endTime = System.currentTimeMillis();
- es.refreshIndex();
- LOG.info("Metadata harvesting completed. Time elapsed: {}", (endTime - startTime) / 1000);
- return null;
- }
-
- /**
- * addMetadataMapping: Add mapping to index metadata in Elasticsearch. Please
- * invoke this method before import metadata to Elasticsearch.
- */
- public void addMetadataMapping() {
- String mappingJson = "{\r\n \"dynamic_templates\": " + "[\r\n " + "{\r\n \"strings\": " + "{\r\n \"match_mapping_type\": \"string\","
- + "\r\n \"mapping\": {\r\n \"type\": \"text\"," + "\r\n \"fielddata\": true," + "\r\n \"analyzer\": \"english\","
- + "\r\n \"fields\": {\r\n \"raw\": {" + "\r\n \"type\": \"string\"," + "\r\n \"index\": \"not_analyzed\"" + "\r\n }"
- + "\r\n }\r\n " + "\r\n }" + "\r\n }\r\n }\r\n ]\r\n}";
-
- es.getClient().admin().indices().preparePutMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME)).setType(props.getProperty(MudrodConstants.RAW_METADATA_TYPE)).setSource(mappingJson).execute()
- .actionGet();
- }
-
- /**
- * importToES: Index metadata into elasticsearch from local file directory.
- * Please make sure metadata have been harvest from web service before
- * invoking this method.
- */
- private void importToES() {
- File directory = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH));
- if(!directory.exists())
- directory.mkdir();
- File[] fList = directory.listFiles();
- for (File file : fList) {
- InputStream is;
- try {
- is = new FileInputStream(file);
- importSingleFileToES(is);
- } catch (FileNotFoundException e) {
- LOG.error("Error finding file!", e);
- }
-
- }
- }
-
- private void importSingleFileToES(InputStream is) {
- try {
- String jsonTxt = IOUtils.toString(is);
- JsonParser parser = new JsonParser();
- JsonElement item = parser.parse(jsonTxt);
- IndexRequest ir = new IndexRequest(props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE)).source(item.toString());
- es.getBulkProcessor().add(ir);
- } catch (IOException e) {
- LOG.error("Error indexing metadata record!", e);
- }
- }
-
- /**
- * harvestMetadatafromWeb: Harvest metadata from PO.DAAC web service.
- */
- private void harvestMetadatafromWeb() {
- LOG.info("Metadata download started.");
- int startIndex = 0;
- int doc_length = 0;
- JsonParser parser = new JsonParser();
- do {
- String searchAPI = "https://podaac.jpl.nasa.gov/api/dataset?startIndex=" + Integer.toString(startIndex) + "&entries=10&sortField=Dataset-AllTimePopularity&sortOrder=asc&id=&value=&search=";
- HttpRequest http = new HttpRequest();
- String response = http.getRequest(searchAPI);
-
- JsonElement json = parser.parse(response);
- JsonObject responseObject = json.getAsJsonObject();
- JsonArray docs = responseObject.getAsJsonObject("response").getAsJsonArray("docs");
-
- doc_length = docs.size();
-
- File file = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH));
- if (!file.exists()) {
- if (file.mkdir()) {
- LOG.info("Directory is created!");
- } else {
- LOG.error("Failed to create directory!");
- }
- }
- for (int i = 0; i < doc_length; i++) {
- JsonElement item = docs.get(i);
- int docId = startIndex + i;
- File itemfile = new File(props.getProperty(MudrodConstants.RAW_METADATA_PATH) + "/" + docId + ".json");
-
- try (FileWriter fw = new FileWriter(itemfile.getAbsoluteFile()); BufferedWriter bw = new BufferedWriter(fw);) {
- itemfile.createNewFile();
- bw.write(item.toString());
- } catch (IOException e) {
- LOG.error("Error writing metadata to local file!", e);
- }
- }
-
- startIndex += 10;
-
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- LOG.error("Error entering Elasticsearch Mappings!", e);
- Thread.currentThread().interrupt();
- }
-
- } while (doc_length != 0);
-
- LOG.info("Metadata downloading finished");
- }
-
- @Override
- public Object execute(Object o) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java
deleted file mode 100644
index 63677e5..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/MatrixGenerator.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.metadata.pre;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.main.MudrodConstants;
-import gov.nasa.jpl.mudrod.metadata.structure.MetadataExtractor;
-import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix;
-import gov.nasa.jpl.mudrod.utils.MatrixUtil;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Generate term-metadata matrix from original metadata. Each row in
- * the matrix is corresponding to a term, and each column is a metadata.
- */
-public class MatrixGenerator extends DiscoveryStepAbstract {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(MatrixGenerator.class);
-
- /**
- * Creates a new instance of MatrixGenerator.
- *
- * @param props the Mudrod configuration
- * @param es the Elasticsearch drive
- * @param spark the spark drive
- */
- public MatrixGenerator(Properties props, ESDriver es, SparkDriver spark) {
- super(props, es, spark);
- }
-
- /**
- * Generate a csv which is a term-metadata matrix genetrated from original
- * metadata.
- *
- * @see DiscoveryStepAbstract#execute()
- */
- @Override
- public Object execute() {
- LOG.info("Metadata matrix started");
- startTime = System.currentTimeMillis();
-
- String metadataMatrixFile = props.getProperty("metadataMatrix");
- try {
- MetadataExtractor extractor = new MetadataExtractor();
- JavaPairRDD<String, List<String>> metadataTermsRDD = extractor.loadMetadata(this.es, this.spark.sc, props.getProperty(MudrodConstants.ES_INDEX_NAME), props.getProperty(MudrodConstants.RAW_METADATA_TYPE));
- LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metadataTermsRDD);
- MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, metadataMatrixFile);
-
- } catch (Exception e) {
- LOG.error("Error during Metadata matrix generaion: {}", e);
- }
-
- endTime = System.currentTimeMillis();
- LOG.info("Metadata matrix finished time elapsed: {}s", (endTime - startTime) / 1000);
- return null;
- }
-
- @Override
- public Object execute(Object o) {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java
deleted file mode 100644
index 60a2429..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/pre/package-info.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes metadata pre-processing functions.
- */
-package gov.nasa.jpl.mudrod.metadata.pre;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java
deleted file mode 100644
index 67dbf2d..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/MetadataAnalyzer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package gov.nasa.jpl.mudrod.metadata.process;
-
-import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
-import gov.nasa.jpl.mudrod.driver.ESDriver;
-import gov.nasa.jpl.mudrod.driver.SparkDriver;
-import gov.nasa.jpl.mudrod.semantics.SVDAnalyzer;
-import gov.nasa.jpl.mudrod.utils.LinkageTriple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * ClassName: MetadataAnalyzer
- * Function: Calculate semantic relationship of vocabularies extracted from
- * metadata.
- */
-public class MetadataAnalyzer extends DiscoveryStepAbstract implements Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(MetadataAnalyzer.class);
-
- /**
- * Creates a new instance of MetadataAnalyzer.
- *
- * @param props the Mudrod configuration
- * @param es the Elasticsearch drive
- * @param spark the spark drive
- */
- public MetadataAnalyzer(Properties props, ESDriver es, SparkDriver spark) {
- super(props, es, spark);
- }
-
- @Override
- public Object execute(Object o) {
- return null;
- }
-
- /**
- * Calculate semantic relationship of vocabularies from a csv file which is a
- * term-metadata matrix.
- *
- * @see DiscoveryStepAbstract#execute()
- */
- @Override
- public Object execute() {
- try {
- LOG.info("*****************Metadata Analyzer starts******************");
- startTime = System.currentTimeMillis();
-
- SVDAnalyzer analyzer = new SVDAnalyzer(props, es, spark);
- int svdDimension = Integer.parseInt(props.getProperty("metadataSVDDimension"));
- String metadataMatrixFile = props.getProperty("metadataMatrix");
- String svdMatrixFileName = props.getProperty("metadataSVDMatrix_tmp");
-
- analyzer.getSVDMatrix(metadataMatrixFile, svdDimension, svdMatrixFileName);
- List<LinkageTriple> triples = analyzer.calTermSimfromMatrix(svdMatrixFileName);
-
- analyzer.saveToES(triples, props.getProperty("indexName"), props.getProperty("metadataLinkageType"));
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- endTime = System.currentTimeMillis();
- es.refreshIndex();
- LOG.info("*****************Metadata Analyzer ends******************Took {}s", (endTime - startTime) / 1000);
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java
deleted file mode 100644
index c988f31..0000000
--- a/core/src/main/java/gov/nasa/jpl/mudrod/metadata/process/package-info.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License"); you
- * may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package includes metadata processing classes.
- */
-package gov.nasa.jpl.mudrod.metadata.process;
\ No newline at end of file