You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:34:57 UTC
[16/21] incubator-sdap-mudrod git commit: SDAP-1 Import all code
under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/structure/SResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/structure/SResult.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/structure/SResult.java
new file mode 100644
index 0000000..33b6233
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/structure/SResult.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.ssearch.structure;
+
+import java.lang.reflect.Field;
+
+/**
+ * Data structure class for search result
+ */
+public class SResult {
+ public static final String rlist[] = { "term_score", "releaseDate_score", /*"versionNum_score",*/
+ "processingL_score", "allPop_score", "monthPop_score", "userPop_score"/*, "termAndv_score"*/ };
+ String shortName = null;
+ String longName = null;
+ String topic = null;
+ String description = null;
+ String relase_date = null;
+
+ public Double final_score = 0.0;
+ public Double term_score = 0.0;
+ public Double releaseDate_score = 0.0;
+ public Double versionNum_score = 0.0;
+ public Double processingL_score = 0.0;
+ public Double click_score = 0.0;
+ public Double allPop_score = 0.0;
+ public Double monthPop_score = 0.0;
+ public Double userPop_score = 0.0;
+ public Double termAndv_score = 0.0;
+ public Integer below = 0;
+
+ public Double Dataset_LongName_score = null;
+ public Double Dataset_Metadata_score = null;
+ public Double DatasetParameter_Term_score = null;
+ public Double DatasetSource_Source_LongName_score = null;
+ public Double DatasetSource_Sensor_LongName_score = null;
+
+ public String version = null;
+ public String processingLevel = null;
+ public String latency = null;
+ public String stopDateLong = null;
+ public String stopDateFormat = null;
+ public Double spatialR_Sat = null;
+ public Double spatialR_Grid = null;
+ public String temporalR = null;
+
+ public Double releaseDate = null;
+ public Double click = null;
+ public Double term = null;
+ public Double versionNum = null;
+ public Double processingL = null;
+ public Double allPop = null;
+ public Double monthPop = null;
+ public Double userPop = null;
+ public Double termAndv = null;
+
+ public Double Dataset_LongName = null;
+ public Double Dataset_Metadata = null;
+ public Double DatasetParameter_Term = null;
+ public Double DatasetSource_Source_LongName = null;
+ public Double DatasetSource_Sensor_LongName = null;
+
+ public Double prediction = 0.0;
+ public String label = null;
+
+ //add by quintinali
+ public String startDate;
+ public String endDate;
+ public String sensors;
+
+ /**
+ * @param shortName short name of dataset
+ * @param longName long name of dataset
+ * @param topic topic of dataset
+ * @param description description of dataset
+ * @param date release date of dataset
+ */
+ public SResult(String shortName, String longName, String topic, String description, String date) {
+ this.shortName = shortName;
+ this.longName = longName;
+ this.topic = topic;
+ this.description = description;
+ this.relase_date = date;
+ }
+
+ public SResult(SResult sr) {
+ for (int i = 0; i < rlist.length; i++) {
+ set(this, rlist[i], get(sr, rlist[i]));
+ }
+ }
+
+ /**
+ * Method of getting export header
+ *
+ * @param delimiter the delimiter used to separate strings
+ * @return header
+ */
+ public static String getHeader(String delimiter) {
+ String str = "";
+ for (int i = 0; i < rlist.length; i++) {
+ str += rlist[i] + delimiter;
+ }
+ str = str + "label" + "\n";
+ return "ShortName" + delimiter + "below" + delimiter + str;
+ }
+
+ /**
+ * Method of get a search results as string
+ *
+ * @param delimiter the delimiter used to separate strings
+ * @return search result as string
+ */
+ public String toString(String delimiter) {
+ String str = "";
+ for (int i = 0; i < rlist.length; i++) {
+ double score = get(this, rlist[i]);
+ str += score + delimiter;
+ }
+ str = str + label + "\n";
+ return shortName + delimiter + below + delimiter + str;
+ }
+
+ /**
+ * Generic setter method
+ *
+ * @param object instance of SResult
+ * @param fieldName field name that needs to be set on
+ * @param fieldValue field value that needs to be set to
+ * @return 1 means success, and 0 otherwise
+ */
+ public static boolean set(Object object, String fieldName, Object fieldValue) {
+ Class<?> clazz = object.getClass();
+ while (clazz != null) {
+ try {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(object, fieldValue);
+ return true;
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Generic getter method
+ *
+ * @param object instance of SResult
+ * @param fieldName field name of search result
+ * @param <V> data type
+ * @return the value of the filed in the object
+ */
+ @SuppressWarnings("unchecked")
+ public static <V> V get(Object object, String fieldName) {
+ Class<?> clazz = object.getClass();
+ while (clazz != null) {
+ try {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (V) field.get(object);
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/structure/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/structure/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/structure/package-info.java
new file mode 100644
index 0000000..a0f9ce5
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/ssearch/structure/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes data structure needed for ranking process
+ */
+package gov.nasa.jpl.mudrod.ssearch.structure;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/ESTransportClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/ESTransportClient.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/ESTransportClient.java
new file mode 100644
index 0000000..151ac8d
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/ESTransportClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gov.nasa.jpl.mudrod.utils;
+
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.reindex.ReindexPlugin;
+import org.elasticsearch.percolator.PercolatorPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.script.mustache.MustachePlugin;
+import org.elasticsearch.transport.Netty3Plugin;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A builder to create an instance of {@link TransportClient} This class
+ * pre-installs the {@link Netty3Plugin}, for the client. These plugins are all
+ * elasticsearch core modules required.
+ */
+@SuppressWarnings({ "unchecked", "varargs" })
+public class ESTransportClient extends TransportClient {
+
+ private static final Collection<Class<? extends Plugin>> PRE_INSTALLED_PLUGINS = Collections
+ .unmodifiableList(Arrays.asList(ReindexPlugin.class, PercolatorPlugin.class, MustachePlugin.class, Netty3Plugin.class));
+
+ @SafeVarargs
+ public ESTransportClient(Settings settings, Class<? extends Plugin>... plugins) {
+ this(settings, Arrays.asList(plugins));
+ }
+
+ public ESTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
+ super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS), null);
+
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/HttpRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/HttpRequest.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/HttpRequest.java
new file mode 100644
index 0000000..be0a46d
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/HttpRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * ClassName: HttpRequest
+ * Function: Http request tool.
+ */
+public class HttpRequest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HttpRequest.class);
+
+ public HttpRequest() {
+ }
+
+ public String getRequest(String requestUrl) {
+ String line = null;
+ try {
+ URL url = new URL(requestUrl);
+
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setDoOutput(true);
+
+ connection.setConnectTimeout(5000);
+ connection.setReadTimeout(5000);
+ int code = connection.getResponseCode();
+ if (code != HttpURLConnection.HTTP_OK) {
+ line = "{\"exception\":\"Service failed\"}";
+ LOG.info(line);
+ } else {
+ InputStream content = connection.getInputStream();
+ BufferedReader in = new BufferedReader(new InputStreamReader(content));
+ line = in.readLine();
+ }
+ } catch (Exception e) {
+ line = "{\"exception\":\"No service was found\"}";
+ LOG.error(line);
+ }
+ return line;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/LabeledRowMatrix.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/LabeledRowMatrix.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/LabeledRowMatrix.java
new file mode 100644
index 0000000..d1d144b
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/LabeledRowMatrix.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.utils;
+
+import org.apache.spark.mllib.linalg.distributed.RowMatrix;
+
+import java.util.List;
+
+/**
+ * ClassName: LabeledRowMatrix
+ * Function: LabeledRowMatrix strut.
+ */
+public class LabeledRowMatrix {
+
+ // words: matrix row titles
+ public List<String> rowkeys;
+ // docs: matrix column titles
+ public List<String> colkeys;
+ // wordDocMatrix: a matrix in which each row is corresponding to a term and
+ // each column is a doc.
+ public RowMatrix rowMatrix;
+
+ public LabeledRowMatrix() {
+ // TODO Auto-generated constructor stub
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/LinkageTriple.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/LinkageTriple.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/LinkageTriple.java
new file mode 100644
index 0000000..90b1568
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/LinkageTriple.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.utils;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * ClassName: LinkageTriple Function: Vocabulary linkage operations
+ */
+public class LinkageTriple implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ // keyAId: ID of term A
+ public long keyAId;
+ // keyBId: ID of term B
+ public long keyBId;
+ // weight: The relationship between term A and Term B
+ public double weight;
+ // keyA: TermA
+ public String keyA;
+ // keyB: TermB
+ public String keyB;
+ // df: Format number
+ public static DecimalFormat df = new DecimalFormat("#.00");
+
+ public LinkageTriple() {
+ // TODO Auto-generated constructor stub
+ }
+
+ /**
+ * TODO Output linkage triples in string format.
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return keyA + "," + keyB + ":" + weight;
+ }
+
+ public static void insertTriples(ESDriver es, List<LinkageTriple> triples, String index, String type) throws IOException {
+ LinkageTriple.insertTriples(es, triples, index, type, false, false);
+ }
+
+ public static void insertTriples(ESDriver es, List<LinkageTriple> triples, String index, String type, Boolean bTriple, boolean bSymmetry) throws IOException {
+ es.deleteType(index, type);
+ if (bTriple) {
+ LinkageTriple.addMapping(es, index, type);
+ }
+
+ if (triples == null) {
+ return;
+ }
+
+ es.createBulkProcessor();
+ int size = triples.size();
+ for (int i = 0; i < size; i++) {
+
+ XContentBuilder jsonBuilder = jsonBuilder().startObject();
+ if (bTriple) {
+
+ jsonBuilder.field("concept_A", triples.get(i).keyA);
+ jsonBuilder.field("concept_B", triples.get(i).keyB);
+
+ } else {
+ jsonBuilder.field("keywords", triples.get(i).keyA + "," + triples.get(i).keyB);
+ }
+
+ jsonBuilder.field("weight", Double.parseDouble(df.format(triples.get(i).weight)));
+ jsonBuilder.endObject();
+
+ IndexRequest ir = new IndexRequest(index, type).source(jsonBuilder);
+ es.getBulkProcessor().add(ir);
+
+ if (bTriple && bSymmetry) {
+ XContentBuilder symmetryJsonBuilder = jsonBuilder().startObject();
+ symmetryJsonBuilder.field("concept_A", triples.get(i).keyB);
+ symmetryJsonBuilder.field("concept_B", triples.get(i).keyA);
+
+ symmetryJsonBuilder.field("weight", Double.parseDouble(df.format(triples.get(i).weight)));
+
+ symmetryJsonBuilder.endObject();
+
+ IndexRequest symmetryir = new IndexRequest(index, type).source(symmetryJsonBuilder);
+ es.getBulkProcessor().add(symmetryir);
+ }
+ }
+ es.destroyBulkProcessor();
+ }
+
+ public static void addMapping(ESDriver es, String index, String type) {
+ XContentBuilder Mapping;
+ try {
+ Mapping = jsonBuilder().startObject().startObject(type).startObject("properties").startObject("concept_A").field("type", "string").field("index", "not_analyzed").endObject()
+ .startObject("concept_B").field("type", "string").field("index", "not_analyzed").endObject()
+
+ .endObject().endObject().endObject();
+
+ es.getClient().admin().indices().preparePutMapping(index).setType(type).setSource(Mapping).execute().actionGet();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void standardTriples(ESDriver es, String index, String type) throws IOException {
+ es.createBulkProcessor();
+
+ SearchResponse sr = es.getClient().prepareSearch(index).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
+ .addAggregation(AggregationBuilders.terms("concepts").field("concept_A").size(0)).execute().actionGet();
+ Terms concepts = sr.getAggregations().get("concepts");
+
+ for (Terms.Bucket entry : concepts.getBuckets()) {
+ String concept = (String) entry.getKey();
+ double maxSim = LinkageTriple.getMaxSimilarity(es, index, type, concept);
+ if (maxSim == 1.0) {
+ continue;
+ }
+
+ SearchResponse scrollResp = es.getClient().prepareSearch(index).setTypes(type).setScroll(new TimeValue(60000)).setQuery(QueryBuilders.termQuery("concept_A", concept))
+ .addSort("weight", SortOrder.DESC).setSize(100).execute().actionGet();
+
+ while (true) {
+ for (SearchHit hit : scrollResp.getHits().getHits()) {
+ Map<String, Object> metadata = hit.getSource();
+ double sim = (double) metadata.get("weight");
+ double newSim = sim / maxSim;
+ UpdateRequest ur = es.generateUpdateRequest(index, type, hit.getId(), "weight", Double.parseDouble(df.format(newSim)));
+ es.getBulkProcessor().add(ur);
+ }
+
+ scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+ if (scrollResp.getHits().getHits().length == 0) {
+ break;
+ }
+ }
+ }
+
+ es.destroyBulkProcessor();
+ }
+
+ private static double getMaxSimilarity(ESDriver es, String index, String type, String concept) {
+
+ double maxSim = 1.0;
+ SearchRequestBuilder builder = es.getClient().prepareSearch(index).setTypes(type).setQuery(QueryBuilders.termQuery("concept_A", concept)).addSort("weight", SortOrder.DESC).setSize(1);
+
+ SearchResponse usrhis = builder.execute().actionGet();
+ SearchHit[] hits = usrhis.getHits().getHits();
+ if (hits.length == 1) {
+ SearchHit hit = hits[0];
+ Map<String, Object> result = hit.getSource();
+ maxSim = (double) result.get("weight");
+ }
+
+ if (maxSim == 0.0) {
+ maxSim = 1.0;
+ }
+
+ return maxSim;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/MatrixUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/MatrixUtil.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/MatrixUtil.java
new file mode 100644
index 0000000..942f1e0
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/MatrixUtil.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.utils;
+
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.mllib.feature.IDF;
+import org.apache.spark.mllib.feature.IDFModel;
+import org.apache.spark.mllib.linalg.*;
+import org.apache.spark.mllib.linalg.distributed.IndexedRow;
+import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
+import org.apache.spark.mllib.linalg.distributed.RowMatrix;
+import scala.Tuple2;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * Matrix utility tool
+ */
+public class MatrixUtil {
+
+ private MatrixUtil() {
+ }
+
+ /**
+ * buildSVDMatrix: Generate SVD matrix from TF-IDF matrix. Please make sure
+ * the TF-IDF matrix has been already built from the original documents.
+ *
+ * @param tfidfMatrix,
+ * each row is a term and each column is a document name and each
+ * cell is the TF-IDF value of the term in the corresponding
+ * document.
+ * @param dimension
+ * Column number of the SVD matrix
+ * @return RowMatrix, each row is a term and each column is a dimension in the
+ * feature space, each cell is value of the term in the corresponding
+ * dimension.
+ */
+ public static RowMatrix buildSVDMatrix(RowMatrix tfidfMatrix, int dimension) {
+ int matrixCol = (int) tfidfMatrix.numCols();
+ if (matrixCol < dimension) {
+ dimension = matrixCol;
+ }
+
+ SingularValueDecomposition<RowMatrix, Matrix> svd = tfidfMatrix.computeSVD(dimension, true, 1.0E-9d);
+ RowMatrix u = svd.U();
+ Vector s = svd.s();
+ return u.multiply(Matrices.diag(s));
+ }
+
+ /**
+ * buildSVDMatrix: Generate SVD matrix from Vector RDD.
+ *
+ * @param vecRDD
+ * vectors of terms in feature space
+ * @param dimension
+ * Column number of the SVD matrix
+ * @return RowMatrix, each row is a term and each column is a dimension in the
+ * feature space, each cell is value of the term in the corresponding
+ * dimension.
+ */
+ public static RowMatrix buildSVDMatrix(JavaRDD<Vector> vecRDD, int dimension) {
+ RowMatrix tfidfMatrix = new RowMatrix(vecRDD.rdd());
+ SingularValueDecomposition<RowMatrix, Matrix> svd = tfidfMatrix.computeSVD(dimension, true, 1.0E-9d);
+ RowMatrix u = svd.U();
+ Vector s = svd.s();
+ return u.multiply(Matrices.diag(s));
+ }
+
+ /**
+ * Create TF-IDF matrix from word-doc matrix.
+ *
+ * @param wordDocMatrix,
+ * each row is a term, each column is a document name and each cell
+ * is number of the term in the corresponding document.
+ * @return RowMatrix, each row is a term and each column is a document name
+ * and each cell is the TF-IDF value of the term in the corresponding
+ * document.
+ */
+ public static RowMatrix createTFIDFMatrix(RowMatrix wordDocMatrix) {
+ JavaRDD<Vector> newcountRDD = wordDocMatrix.rows().toJavaRDD();
+ IDFModel idfModel = new IDF().fit(newcountRDD);
+ JavaRDD<Vector> idf = idfModel.transform(newcountRDD);
+ return new RowMatrix(idf.rdd());
+ }
+
+ /**
+ * Create matrix from doc-terms JavaPairRDD.
+ *
+ * @param uniqueDocRDD
+ * doc-terms JavaPairRDD, in which each key is a doc name, and value
+ * is term list extracted from that doc
+ * @return LabeledRowMatrix {@link LabeledRowMatrix}
+ */
+ public static LabeledRowMatrix createWordDocMatrix(JavaPairRDD<String, List<String>> uniqueDocRDD) {
+ // Index documents with unique IDs
+ JavaPairRDD<List<String>, Long> corpus = uniqueDocRDD.values().zipWithIndex();
+ // cal word-doc numbers
+ JavaPairRDD<Tuple2<String, Long>, Double> worddocNumRDD = corpus.flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Long>, Tuple2<String, Long>, Double>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Iterator<Tuple2<Tuple2<String, Long>, Double>> call(Tuple2<List<String>, Long> docwords) throws Exception {
+ List<Tuple2<Tuple2<String, Long>, Double>> pairs = new ArrayList<>();
+ List<String> words = docwords._1;
+ int n = words.size();
+ for (int i = 0; i < n; i++) {
+ Tuple2<String, Long> worddoc = new Tuple2<>(words.get(i), docwords._2);
+ pairs.add(new Tuple2<Tuple2<String, Long>, Double>(worddoc, 1.0));
+ }
+ return pairs.iterator();
+ }
+ }).reduceByKey(new Function2<Double, Double, Double>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Double call(Double first, Double second) throws Exception {
+ return first + second;
+ }
+ });
+ // cal word doc-numbers
+ JavaPairRDD<String, Tuple2<List<Long>, List<Double>>> wordDocnumRDD = worddocNumRDD
+ .mapToPair(new PairFunction<Tuple2<Tuple2<String, Long>, Double>, String, Tuple2<List<Long>, List<Double>>>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Tuple2<List<Long>, List<Double>>> call(Tuple2<Tuple2<String, Long>, Double> worddocNum) throws Exception {
+ List<Long> docs = new ArrayList<>();
+ docs.add(worddocNum._1._2);
+ List<Double> nums = new ArrayList<>();
+ nums.add(worddocNum._2);
+ Tuple2<List<Long>, List<Double>> docmums = new Tuple2<>(docs, nums);
+ return new Tuple2<>(worddocNum._1._1, docmums);
+ }
+ });
+ // trans to vector
+ final int corporsize = (int) uniqueDocRDD.keys().count();
+ JavaPairRDD<String, Vector> wordVectorRDD = wordDocnumRDD.reduceByKey(new Function2<Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<List<Long>, List<Double>> call(Tuple2<List<Long>, List<Double>> arg0, Tuple2<List<Long>, List<Double>> arg1) throws Exception {
+ arg0._1.addAll(arg1._1);
+ arg0._2.addAll(arg1._2);
+ return new Tuple2<>(arg0._1, arg0._2);
+ }
+ }).mapToPair(new PairFunction<Tuple2<String, Tuple2<List<Long>, List<Double>>>, String, Vector>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Vector> call(Tuple2<String, Tuple2<List<Long>, List<Double>>> arg0) throws Exception {
+ int docsize = arg0._2._1.size();
+ int[] intArray = new int[docsize];
+ double[] doubleArray = new double[docsize];
+ for (int i = 0; i < docsize; i++) {
+ intArray[i] = arg0._2._1.get(i).intValue();
+ doubleArray[i] = arg0._2._2.get(i).intValue();
+ }
+ Vector sv = Vectors.sparse(corporsize, intArray, doubleArray);
+ return new Tuple2<>(arg0._1, sv);
+ }
+ });
+
+ RowMatrix wordDocMatrix = new RowMatrix(wordVectorRDD.values().rdd());
+
+ LabeledRowMatrix labeledRowMatrix = new LabeledRowMatrix();
+ labeledRowMatrix.rowMatrix = wordDocMatrix;
+ labeledRowMatrix.rowkeys = wordVectorRDD.keys().collect();
+ labeledRowMatrix.colkeys = uniqueDocRDD.keys().collect();
+ return labeledRowMatrix;
+ }
+
+ public static LabeledRowMatrix createDocWordMatrix(JavaPairRDD<String, List<String>> uniqueDocRDD, JavaSparkContext sc) {
+ // Index word with unique IDs
+ JavaPairRDD<String, Long> wordIDRDD = uniqueDocRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Iterator<String> call(List<String> arg0) throws Exception {
+ return arg0.iterator();
+ }
+ }).distinct().zipWithIndex();
+
+ //
+ JavaPairRDD<Tuple2<String, String>, Double> docwordNumRDD = uniqueDocRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, List<String>>, Tuple2<String, String>, Double>() {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Iterator<Tuple2<Tuple2<String, String>, Double>> call(Tuple2<String, List<String>> docwords) throws Exception {
+ List<Tuple2<Tuple2<String, String>, Double>> pairs = new ArrayList<>();
+ List<String> words = docwords._2;
+ int n = words.size();
+ for (int i = 0; i < n; i++) {
+ Tuple2<String, String> worddoc = new Tuple2<>(docwords._1, words.get(i));
+ pairs.add(new Tuple2<Tuple2<String, String>, Double>(worddoc, 1.0));
+ }
+ return pairs.iterator();
+ }
+ }).reduceByKey(new Function2<Double, Double, Double>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Double call(Double first, Double second) throws Exception {
+ return first + second;
+ }
+ });
+
+ //
+ JavaPairRDD<String, Tuple2<String, Double>> wordDocnumRDD = docwordNumRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Double>, String, Tuple2<String, Double>>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Tuple2<String, Double>> call(Tuple2<Tuple2<String, String>, Double> arg0) throws Exception {
+
+ Tuple2<String, Double> wordmums = new Tuple2<>(arg0._1._1, arg0._2);
+ return new Tuple2<>(arg0._1._2, wordmums);
+ }
+ });
+
+ //
+
+ JavaPairRDD<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> testRDD = wordDocnumRDD.leftOuterJoin(wordIDRDD);
+
+ int wordsize = (int) wordIDRDD.count();
+ JavaPairRDD<String, Vector> docVectorRDD = testRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>>, String, Tuple2<List<Long>, List<Double>>>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Tuple2<List<Long>, List<Double>>> call(Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> arg0) throws Exception {
+ Optional<Long> oid = arg0._2._2;
+ Long wordId = (long) 0;
+ if (oid.isPresent()) {
+ wordId = oid.get();
+ }
+
+ List<Long> word = new ArrayList<>();
+ word.add(wordId);
+
+ List<Double> count = new ArrayList<>();
+ count.add(arg0._2._1._2);
+
+ Tuple2<List<Long>, List<Double>> wordcount = new Tuple2<>(word, count);
+
+ return new Tuple2<>(arg0._2._1._1, wordcount);
+ }
+
+ }).reduceByKey(new Function2<Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<List<Long>, List<Double>> call(Tuple2<List<Long>, List<Double>> arg0, Tuple2<List<Long>, List<Double>> arg1) throws Exception {
+ arg0._1.addAll(arg1._1);
+ arg0._2.addAll(arg1._2);
+ return new Tuple2<>(arg0._1, arg0._2);
+ }
+ }).mapToPair(new PairFunction<Tuple2<String, Tuple2<List<Long>, List<Double>>>, String, Vector>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Vector> call(Tuple2<String, Tuple2<List<Long>, List<Double>>> arg0) throws Exception {
+ int docsize = arg0._2._1.size();
+ int[] intArray = new int[docsize];
+ double[] doubleArray = new double[docsize];
+ for (int i = 0; i < docsize; i++) {
+ intArray[i] = arg0._2._1.get(i).intValue();
+ doubleArray[i] = arg0._2._2.get(i).intValue();
+ }
+ Vector sv = Vectors.sparse(wordsize, intArray, doubleArray);
+ return new Tuple2<>(arg0._1, sv);
+ }
+ });
+
+ RowMatrix docwordMatrix = new RowMatrix(docVectorRDD.values().rdd());
+
+ LabeledRowMatrix labeledRowMatrix = new LabeledRowMatrix();
+ labeledRowMatrix.rowMatrix = docwordMatrix;
+ labeledRowMatrix.rowkeys = docVectorRDD.keys().collect();
+ labeledRowMatrix.colkeys = wordIDRDD.keys().collect();
+
+ return labeledRowMatrix;
+ }
+
+ /**
+ * loadVectorFromCSV: Load term vector from csv file.
+ *
+ * @param spark
+ * spark instance
+ * @param csvFileName
+ * csv matrix file
+ * @param skipNum
+ * the numbers of rows which should be skipped Ignore the top skip
+ * number rows of the csv file
+ * @return JavaPairRDD, each key is a term, and value is the vector of the
+ * term in feature space.
+ */
+ public static JavaPairRDD<String, Vector> loadVectorFromCSV(SparkDriver spark, String csvFileName, int skipNum) {
+ // skip the first line (header), important!
+ JavaRDD<String> importRDD = spark.sc.textFile(csvFileName);
+ JavaPairRDD<String, Long> importIdRDD = importRDD.zipWithIndex().filter(new Function<Tuple2<String, Long>, Boolean>() {
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Boolean call(Tuple2<String, Long> v1) throws Exception {
+ if (v1._2 > (skipNum - 1)) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ if (importIdRDD.count() == 0) {
+ return null;
+ }
+
+ return importIdRDD.mapToPair(new PairFunction<Tuple2<String, Long>, String, Vector>() {
+ /** */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Vector> call(Tuple2<String, Long> t) throws Exception {
+ String[] fields = t._1.split(",");
+ String word = fields[0];
+ int fieldsize = fields.length;
+ int nStart = 1;
+ int nEnd = fieldsize - 1;
+ if (fieldsize < 2) {
+ nStart = 0;
+ nEnd = 0;
+ }
+ String[] numfields = Arrays.copyOfRange(fields, nStart, nEnd);
+
+ double[] nums = Stream.of(numfields).mapToDouble(Double::parseDouble).toArray();
+ Vector vec = Vectors.dense(nums);
+ return new Tuple2<>(word, vec);
+ }
+ });
+ }
+
+ /**
+ * Convert vectorRDD to indexed row matrix.
+ *
+ * @param vecs
+ * Vector RDD
+ * @return IndexedRowMatrix
+ */
+ public static IndexedRowMatrix buildIndexRowMatrix(JavaRDD<Vector> vecs) {
+ JavaRDD<IndexedRow> indexrows = vecs.zipWithIndex().map(new Function<Tuple2<Vector, Long>, IndexedRow>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IndexedRow call(Tuple2<Vector, Long> docId) {
+ return new IndexedRow(docId._2, docId._1);
+ }
+ });
+ return new IndexedRowMatrix(indexrows.rdd());
+ }
+
+ /**
+ * Transpose matrix
+ *
+ * @param indexedMatrix
+ * spark indexed matrix
+ * @return rowmatrix, each row is corresponding to the column in the original
+ * matrix and vice versa
+ */
+ public static RowMatrix transposeMatrix(IndexedRowMatrix indexedMatrix) {
+ return indexedMatrix.toCoordinateMatrix().transpose().toRowMatrix();
+ }
+
+ /**
+ * Output matrix to a CSV file.
+ *
+ * @param matrix
+ * spark row matrix
+ * @param rowKeys
+ * matrix row names
+ * @param colKeys
+ * matrix coloum names
+ * @param fileName
+ * csv file name
+ */
+ public static void exportToCSV(RowMatrix matrix, List<String> rowKeys, List<String> colKeys, String fileName) {
+
+ if (matrix.rows().isEmpty()) {
+ return;
+ }
+
+ int rownum = (int) matrix.numRows();
+ int colnum = (int) matrix.numCols();
+ List<Vector> rows = matrix.rows().toJavaRDD().collect();
+
+ File file = new File(fileName);
+ if (file.exists()) {
+ file.delete();
+ }
+ try {
+ file.createNewFile();
+ FileWriter fw = new FileWriter(file.getAbsoluteFile());
+ BufferedWriter bw = new BufferedWriter(fw);
+ String coltitle = " Num" + ",";
+ for (int j = 0; j < colnum; j++) {
+ coltitle += "\"" + colKeys.get(j) + "\",";
+ }
+ coltitle = coltitle.substring(0, coltitle.length() - 1);
+ bw.write(coltitle + "\n");
+
+ for (int i = 0; i < rownum; i++) {
+ double[] rowvlaue = rows.get(i).toArray();
+ String row = rowKeys.get(i) + ",";
+ for (int j = 0; j < colnum; j++) {
+ row += rowvlaue[j] + ",";
+ }
+ row = row.substring(0, row.length() - 1);
+ bw.write(row + "\n");
+ }
+
+ bw.close();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/RDDUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/RDDUtil.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/RDDUtil.java
new file mode 100644
index 0000000..3fc45f4
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/RDDUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.utils;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * ClassName: RDDUtil Function: Mudrod Spark RDD common methods
+ */
+public class RDDUtil {
+
+ public RDDUtil() {
+ }
+
+ /**
+ * getAllWordsInDoc: Extracted all unique terms from all docs.
+ *
+ * @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from
+ * that doc.
+ * @return unique term list
+ */
+ public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) {
+ JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Iterator<String> call(List<String> list) {
+ return list.iterator();
+ }
+ }).distinct();
+
+ return wordRDD;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/SVDUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/SVDUtil.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/SVDUtil.java
new file mode 100644
index 0000000..1982996
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/SVDUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.utils;
+
+import gov.nasa.jpl.mudrod.discoveryengine.MudrodAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
+import org.apache.spark.mllib.linalg.distributed.RowMatrix;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Singular value decomposition
+ */
+public class SVDUtil extends MudrodAbstract {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ // wordRDD: terms extracted from all documents
+ JavaRDD<String> wordRDD;
+ // svdMatrix: svd matrix
+ private RowMatrix svdMatrix;
+ // simMatrix: similarity matrix
+ private CoordinateMatrix simMatrix;
+
+ /**
+ * Creates a new instance of SVDUtil.
+ *
+ * @param config the Mudrod configuration
+ * @param es the Elasticsearch drive
+ * @param spark the spark driver
+ */
+ public SVDUtil(Properties config, ESDriver es, SparkDriver spark) {
+ super(config, es, spark);
+ }
+
+ /**
+ * Build SVD matrix from docment-terms pairs.
+ *
+ * @param docwordRDD JavaPairRDD, key is short name of data set and values are terms in
+ * the corresponding data set
+ * @param svdDimension: Dimension of matrix after singular value decomposition
+ * @return row matrix
+ */
+ public RowMatrix buildSVDMatrix(JavaPairRDD<String, List<String>> docwordRDD, int svdDimension) {
+
+ RowMatrix svdMatrix = null;
+ LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(docwordRDD);
+ RowMatrix ifIdfMatrix = MatrixUtil.createTFIDFMatrix(wordDocMatrix.rowMatrix);
+ svdMatrix = MatrixUtil.buildSVDMatrix(ifIdfMatrix, svdDimension);
+ this.svdMatrix = svdMatrix;
+ this.wordRDD = RDDUtil.getAllWordsInDoc(docwordRDD);
+ return svdMatrix;
+ }
+
+ /**
+ * Build svd matrix from CSV file.
+ *
+ * @param tfidfCSVfile tf-idf matrix csv file
+ * @param svdDimension: Dimension of matrix after singular value decomposition
+ * @return row matrix
+ */
+ public RowMatrix buildSVDMatrix(String tfidfCSVfile, int svdDimension) {
+ RowMatrix svdMatrix = null;
+ JavaPairRDD<String, Vector> tfidfRDD = MatrixUtil.loadVectorFromCSV(spark, tfidfCSVfile, 2);
+ JavaRDD<Vector> vectorRDD = tfidfRDD.values();
+
+ svdMatrix = MatrixUtil.buildSVDMatrix(vectorRDD, svdDimension);
+ this.svdMatrix = svdMatrix;
+
+ this.wordRDD = tfidfRDD.keys();
+
+ return svdMatrix;
+ }
+
+ /**
+ * Calculate similarity
+ */
+ public void calSimilarity() {
+ CoordinateMatrix simMatrix = SimilarityUtil.calculateSimilarityFromMatrix(svdMatrix);
+ this.simMatrix = simMatrix;
+ }
+
+ /**
+ * Insert linkage triples to elasticsearch
+ *
+ * @param index index name
+ * @param type linkage triple name
+ */
+ public void insertLinkageToES(String index, String type) {
+ List<LinkageTriple> triples = SimilarityUtil.matrixToTriples(wordRDD, simMatrix);
+ try {
+ LinkageTriple.insertTriples(es, triples, index, type);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/SimilarityUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/SimilarityUtil.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/SimilarityUtil.java
new file mode 100644
index 0000000..8ae9770
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/SimilarityUtil.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.utils;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
+import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
+import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
+import org.apache.spark.mllib.linalg.distributed.RowMatrix;
+import scala.Tuple2;
+
+import java.util.List;
+
+/**
+ * Similarity and distrance calculation utilities
+ */
+public class SimilarityUtil {
+
+ public static final int SIM_COSINE = 3;
+ public static final int SIM_HELLINGER = 2;
+ public static final int SIM_PEARSON = 1;
+ /**
+ * CalSimilarityFromMatrix: Calculate term similarity from matrix.
+ *
+ * @param svdMatrix. Each row is corresponding to a term, and each column is
+ * corresponding to a dimension of feature
+ * @return CoordinateMatrix, each row is corresponding to a term, and each
+ * column is also a term, the cell value is the similarity between the
+ * two terms
+ */
+ public static CoordinateMatrix calculateSimilarityFromMatrix(RowMatrix svdMatrix) {
+ JavaRDD<Vector> vecs = svdMatrix.rows().toJavaRDD();
+ return SimilarityUtil.calculateSimilarityFromVector(vecs);
+ }
+
+ /**
+ * CalSimilarityFromVector:Calculate term similarity from vector.
+ *
+ * @param vecs Each vector is corresponding to a term in the feature space.
+ * @return CoordinateMatrix, each row is corresponding to a term, and each
+ * column is also a term, the cell value is the similarity between the
+ * two terms
+ */
+ public static CoordinateMatrix calculateSimilarityFromVector(JavaRDD<Vector> vecs) {
+ IndexedRowMatrix indexedMatrix = MatrixUtil.buildIndexRowMatrix(vecs);
+ RowMatrix transposeMatrix = MatrixUtil.transposeMatrix(indexedMatrix);
+ return transposeMatrix.columnSimilarities();
+ }
+
+ /**
+ * Calculate term similarity from vector.
+ *
+ * @param importRDD the {@link org.apache.spark.api.java.JavaPairRDD}
+ * data structure containing the vectors.
+ * @param simType the similarity calculation to execute e.g.
+ * <ul>
+ * <li>{@link gov.nasa.jpl.mudrod.utils.SimilarityUtil#SIM_COSINE} - 3,</li>
+ * <li>{@link gov.nasa.jpl.mudrod.utils.SimilarityUtil#SIM_HELLINGER} - 2,</li>
+ * <li>{@link gov.nasa.jpl.mudrod.utils.SimilarityUtil#SIM_PEARSON} - 1</li>
+ * </ul>
+ * @return a new {@link org.apache.spark.api.java.JavaPairRDD}
+ */
+ public static JavaRDD<LinkageTriple> calculateSimilarityFromVector(JavaPairRDD<String, Vector> importRDD, int simType) {
+ JavaRDD<Tuple2<String, Vector>> importRDD1 = importRDD.map(f -> new Tuple2<String, Vector>(f._1, f._2));
+ JavaPairRDD<Tuple2<String, Vector>, Tuple2<String, Vector>> cartesianRDD = importRDD1.cartesian(importRDD1);
+
+ return cartesianRDD.map(new Function<Tuple2<Tuple2<String, Vector>, Tuple2<String, Vector>>, LinkageTriple>() {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public LinkageTriple call(Tuple2<Tuple2<String, Vector>, Tuple2<String, Vector>> arg) {
+ String keyA = arg._1._1;
+ String keyB = arg._2._1;
+
+ if (keyA.equals(keyB)) {
+ return null;
+ }
+
+ Vector vecA = arg._1._2;
+ Vector vecB = arg._2._2;
+ Double weight = 0.0;
+
+ if (simType == SimilarityUtil.SIM_PEARSON) {
+ weight = SimilarityUtil.pearsonDistance(vecA, vecB);
+ } else if (simType == SimilarityUtil.SIM_HELLINGER) {
+ weight = SimilarityUtil.hellingerDistance(vecA, vecB);
+ }
+
+ LinkageTriple triple = new LinkageTriple();
+ triple.keyA = keyA;
+ triple.keyB = keyB;
+ triple.weight = weight;
+ return triple;
+ }
+ }).filter(new Function<LinkageTriple, Boolean>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Boolean call(LinkageTriple arg0) throws Exception {
+ if (arg0 == null) {
+ return false;
+ }
+ return true;
+ }
+ });
+ }
+
+ /**
+ * MatrixtoTriples:Convert term similarity matrix to linkage triple list.
+ *
+ * @param keys each key is a term
+ * @param simMatirx term similarity matrix, in which each row and column is a term and
+ * the cell value is the similarity between the two terms
+ * @return linkage triple list
+ */
+ public static List<LinkageTriple> matrixToTriples(JavaRDD<String> keys, CoordinateMatrix simMatirx) {
+ if (simMatirx.numCols() != keys.count()) {
+ return null;
+ }
+
+ // index words
+ JavaPairRDD<Long, String> keyIdRDD = JavaPairRDD.fromJavaRDD(keys.zipWithIndex().map(new Function<Tuple2<String, Long>, Tuple2<Long, String>>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Long, String> call(Tuple2<String, Long> docId) {
+ return docId.swap();
+ }
+ }));
+
+ JavaPairRDD<Long, LinkageTriple> entriesRowRDD = simMatirx.entries().toJavaRDD().mapToPair(new PairFunction<MatrixEntry, Long, LinkageTriple>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Long, LinkageTriple> call(MatrixEntry t) throws Exception {
+ LinkageTriple triple = new LinkageTriple();
+ triple.keyAId = t.i();
+ triple.keyBId = t.j();
+ triple.weight = t.value();
+ return new Tuple2<>(triple.keyAId, triple);
+ }
+ });
+
+ JavaPairRDD<Long, LinkageTriple> entriesColRDD = entriesRowRDD.leftOuterJoin(keyIdRDD).values().mapToPair(new PairFunction<Tuple2<LinkageTriple, Optional<String>>, Long, LinkageTriple>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Long, LinkageTriple> call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception {
+ LinkageTriple triple = t._1;
+ Optional<String> stra = t._2;
+ if (stra.isPresent()) {
+ triple.keyA = stra.get();
+ }
+ return new Tuple2<>(triple.keyBId, triple);
+ }
+ });
+
+ JavaRDD<LinkageTriple> tripleRDD = entriesColRDD.leftOuterJoin(keyIdRDD).values().map(new Function<Tuple2<LinkageTriple, Optional<String>>, LinkageTriple>() {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public LinkageTriple call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception {
+ LinkageTriple triple = t._1;
+ Optional<String> strb = t._2;
+ if (strb.isPresent()) {
+ triple.keyB = strb.get();
+ }
+ return triple;
+ }
+ });
+ return tripleRDD.collect();
+ }
+
+ /**
+ * Calculate similarity (Hellinger Distance) between vectors
+ *
+ * @param vecA initial vector from which to calculate a similarity
+ * @param vecB second vector involved in similarity calculation
+ * @return similarity between two vectors
+ */
+ public static double hellingerDistance(Vector vecA, Vector vecB) {
+ double[] arrA = vecA.toArray();
+ double[] arrB = vecB.toArray();
+
+ double sim = 0.0;
+
+ int arrsize = arrA.length;
+ for (int i = 0; i < arrsize; i++) {
+ double a = arrA[i];
+ double b = arrB[i];
+ double sqrtDiff = Math.sqrt(a) - Math.sqrt(b);
+ sim += sqrtDiff * sqrtDiff;
+ }
+
+ sim = sim / Math.sqrt(2);
+
+ return sim;
+ }
+
+ /**
+ * Calculate similarity (Pearson Distance) between vectors
+ *
+ * @param vecA initial vector from which to calculate a similarity
+ * @param vecB second vector involved in similarity calculation
+ * @return similarity between two vectors
+ */
+ public static double pearsonDistance(Vector vecA, Vector vecB) {
+ double[] arrA = vecA.toArray();
+ double[] arrB = vecB.toArray();
+
+ int viewA = 0;
+ int viewB = 0;
+ int viewAB = 0;
+
+ int arrsize = arrA.length;
+ for (int i = 0; i < arrsize; i++) {
+ if (arrA[i] > 0) {
+ viewA++;
+ }
+
+ if (arrB[i] > 0) {
+ viewB++;
+ }
+
+ if (arrB[i] > 0 && arrA[i] > 0) {
+ viewAB++;
+ }
+ }
+ return viewAB / (Math.sqrt(viewA) * Math.sqrt(viewB));
+ }
+
+ /**
+ * calculate similarity between vectors
+ *
+ * @param vecA initial vector from which to calculate a similarity
+ * @param vecB second vector involved in similarity calculation
+ * @return similarity between two vectors
+ */
+ public static double cosineDistance(Vector vecA, Vector vecB) {
+ return 1;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/utils/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/utils/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/utils/package-info.java
new file mode 100644
index 0000000..3fcd95e
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/utils/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes utilities classes for calculating similarity and
+ * parsing HTTP request
+ */
+package gov.nasa.jpl.mudrod.utils;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/package-info.java
new file mode 100644
index 0000000..f4a8b86
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes web log pre-processing, processing, and data structure
+ * classes.
+ */
+package gov.nasa.jpl.mudrod.weblog;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java
new file mode 100644
index 0000000..4397873
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/KGreedyPartitionSolver.java
@@ -0,0 +1,142 @@
+package gov.nasa.jpl.mudrod.weblog.partition;
+
+import java.util.*;
+
+public class KGreedyPartitionSolver implements ThePartitionProblemSolver {
+
+ public boolean bsorted = false;
+
+ public KGreedyPartitionSolver() {
+ // default constructor
+ }
+
+ public KGreedyPartitionSolver(boolean bsorted) {
+ this.bsorted = true;
+ }
+
+ @Override
+ public Map<String, Integer> solve(Map<String, Double> labelNums, int k) {
+ List<Double> lista = null;
+ List<String> months = null;
+
+ if (!this.bsorted) {
+ LinkedHashMap sortedMap = this.sortMapByValue(labelNums);
+ lista = new ArrayList(sortedMap.values());
+ months = new ArrayList(sortedMap.keySet());
+ } else {
+ lista = new ArrayList(labelNums.values());
+ months = new ArrayList(labelNums.keySet());
+ }
+
+ List<List<Double>> parts = new ArrayList<>();
+ List<List<String>> splitMonths = new ArrayList<>();
+
+ for (int i = 0; i < k; i++) {
+ List<Double> part = new ArrayList();
+ parts.add(part);
+
+ List<String> monthList = new ArrayList();
+ splitMonths.add(monthList);
+ }
+
+ int j = 0;
+ for (Double lista1 : lista) {
+
+ Double minimalSum = -1.0;
+ int position = 0;
+ for (int i = 0; i < parts.size(); i++) {
+ List<Double> part = parts.get(i);
+ if (minimalSum == -1) {
+ minimalSum = Suma(part);
+ position = i;
+ } else if (Suma(part) < minimalSum) {
+ minimalSum = Suma(part);
+ position = i;
+ }
+ }
+
+ List<Double> part = parts.get(position);
+ part.add(lista1);
+ parts.set(position, part);
+
+ List<String> month = splitMonths.get(position);
+ month.add(months.get(j));
+ splitMonths.set(position, month);
+ j++;
+ }
+
+ /* for(int i=0; i<splitMonths.size(); i++){
+ System.out.println("group:" + i);
+ printStrList(splitMonths.get(i));
+ }
+
+ for(int i=0; i<parts.size(); i++){
+ print(parts.get(i));
+ }*/
+
+ Map<String, Integer> LabelGroups = new HashMap<String, Integer>();
+ for (int i = 0; i < splitMonths.size(); i++) {
+ List<String> list = splitMonths.get(i);
+ for (int m = 0; m < list.size(); m++) {
+ LabelGroups.put(list.get(m), i);
+ }
+ }
+
+ return LabelGroups;
+ }
+
+ public LinkedHashMap<String, Double> sortMapByValue(Map passedMap) {
+ List mapKeys = new ArrayList(passedMap.keySet());
+ List mapValues = new ArrayList(passedMap.values());
+ Collections.sort(mapValues, Collections.reverseOrder());
+ Collections.sort(mapKeys, Collections.reverseOrder());
+
+ LinkedHashMap sortedMap = new LinkedHashMap();
+
+ Iterator valueIt = mapValues.iterator();
+ while (valueIt.hasNext()) {
+ Object val = valueIt.next();
+ Iterator keyIt = mapKeys.iterator();
+
+ while (keyIt.hasNext()) {
+ Object key = keyIt.next();
+ String comp1 = passedMap.get(key).toString();
+ String comp2 = val.toString();
+
+ if (comp1.equals(comp2)) {
+ passedMap.remove(key);
+ mapKeys.remove(key);
+ sortedMap.put((String) key, (Double) val);
+ break;
+ }
+
+ }
+
+ }
+ return sortedMap;
+ }
+
+ private Double Suma(List<Double> part) {
+ Double ret = 0.0;
+ for (int i = 0; i < part.size(); i++) {
+ ret += part.get(i);
+ }
+ return ret;
+ }
+
+ private void print(List<Double> list) {
+ /*for (int i = 0; i < list.size(); i++) {
+ System.out.print(list.get(i)+",");
+ }*/
+ System.out.print("sum is:" + Suma(list));
+ System.out.println();
+ }
+
+ private void printStrList(List<String> list) {
+ for (int i = 0; i < list.size(); i++) {
+ System.out.print(list.get(i) + ",");
+ }
+ System.out.println();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java
new file mode 100644
index 0000000..11aaed3
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/ThePartitionProblemSolver.java
@@ -0,0 +1,8 @@
+package gov.nasa.jpl.mudrod.weblog.partition;
+
+import java.util.Map;
+
+public interface ThePartitionProblemSolver {
+
+ public Map<String, Integer> solve(Map<String, Double> labelNums, int k);
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java
new file mode 100644
index 0000000..4c299dd
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/partition/logPartitioner.java
@@ -0,0 +1,33 @@
+package gov.nasa.jpl.mudrod.weblog.partition;
+
+import org.apache.spark.Partitioner;
+
+import java.util.Map;
+
+public class logPartitioner extends Partitioner {
+
+ int num;
+ Map<String, Integer> UserGroups;
+
+ public logPartitioner(int num) {
+ this.num = num;
+ }
+
+ public logPartitioner(Map<String, Integer> UserGroups, int num) {
+ this.UserGroups = UserGroups;
+ this.num = num;
+ }
+
+ @Override
+ public int getPartition(Object arg0) {
+ // TODO Auto-generated method stub
+ String user = (String) arg0;
+ return UserGroups.get(user);
+ }
+
+ @Override
+ public int numPartitions() {
+ // TODO Auto-generated method stub
+ return num;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java
new file mode 100644
index 0000000..34323df
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/ClickStreamGenerator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.pre;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.utils.LabeledRowMatrix;
+import gov.nasa.jpl.mudrod.utils.MatrixUtil;
+import gov.nasa.jpl.mudrod.weblog.structure.ClickStream;
+import gov.nasa.jpl.mudrod.weblog.structure.SessionExtractor;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Supports ability to extract click stream data based on session processing results
+ */
+public class ClickStreamGenerator extends DiscoveryStepAbstract {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(ClickStreamGenerator.class);
+
+ public ClickStreamGenerator(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ }
+
+ @Override
+ public Object execute() {
+ LOG.info("Starting ClickStreamGenerator...");
+ startTime = System.currentTimeMillis();
+
+ String clickstremMatrixFile = props.getProperty("clickstreamMatrix");
+ try {
+ SessionExtractor extractor = new SessionExtractor();
+ JavaRDD<ClickStream> clickstreamRDD = extractor.extractClickStreamFromES(this.props, this.es, this.spark);
+ int weight = Integer.parseInt(props.getProperty("downloadWeight"));
+ JavaPairRDD<String, List<String>> metaddataQueryRDD = extractor.bulidDataQueryRDD(clickstreamRDD, weight);
+ LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metaddataQueryRDD);
+
+ MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, clickstremMatrixFile);
+ } catch (Exception e) {
+ LOG.error("Encountered error within ClickStreamGenerator: {}", e);
+ }
+
+ endTime = System.currentTimeMillis();
+ LOG.info("ClickStreamGenerator complete. Time elapsed {} seconds.", (endTime - startTime) / 1000);
+ return null;
+ }
+
+ @Override
+ public Object execute(Object o) {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java
new file mode 100644
index 0000000..80bf33b
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/CrawlerDetection.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.pre;
+
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An {@link DiscoveryStepAbstract}
+ * implementation which detects a known list of Web crawlers which may may be
+ * present within, and pollute various logs acting as input to Mudrod.
+ */
+public class CrawlerDetection extends LogAbstract {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(CrawlerDetection.class);
+
+ public static final String CRAWLER = "crawler";
+ public static final String GOOGLE_BOT = "googlebot";
+ public static final String BING_BOT = "bingbot";
+ public static final String YAHOO_BOT = "slurp";
+ public static final String YACY_BOT = "yacybot";
+ public static final String ROGER_BOT = "rogerbot";
+ public static final String YANDEX_BOT = "yandexbot";
+
+ public static final String NO_AGENT_BOT = "-";
+ public static final String PERL_BOT = "libwww-perl/";
+ public static final String APACHE_HHTP = "apache-httpclient/";
+ public static final String JAVA_CLIENT = "java/";
+ public static final String CURL = "curl/";
+
+ /**
+ * Paramterized constructor to instantiate a configured instance of
+ * {@link CrawlerDetection}
+ *
+ * @param props populated {@link java.util.Properties} object
+ * @param es {@link ESDriver} object to use in
+ * crawler detection preprocessing.
+ * @param spark {@link SparkDriver} object to use in
+ * crawler detection preprocessing.
+ */
+ public CrawlerDetection(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ }
+
+ public CrawlerDetection() {
+ super(null, null, null);
+ }
+
+ @Override
+ public Object execute() {
+ LOG.info("Starting Crawler detection {}.", httpType);
+ startTime = System.currentTimeMillis();
+ try {
+ checkByRate();
+ } catch (InterruptedException | IOException e) {
+ LOG.error("Encountered an error whilst detecting Web crawlers.", e);
+ }
+ endTime = System.currentTimeMillis();
+ es.refreshIndex();
+ LOG.info("Crawler detection complete. Time elapsed {} seconds", (endTime - startTime) / 1000);
+ return null;
+ }
+
+ /**
+ * Check known crawler through crawler agent name list
+ *
+ * @param agent name of a log line
+ * @return 1 if the log is initiated by crawler, 0 otherwise
+ */
+ public boolean checkKnownCrawler(String agent) {
+ agent = agent.toLowerCase();
+ if (agent.contains(CRAWLER) || agent.contains(GOOGLE_BOT) || agent.contains(BING_BOT) || agent.contains(APACHE_HHTP) || agent.contains(PERL_BOT) || agent.contains(YAHOO_BOT) || agent
+ .contains(YANDEX_BOT) || agent.contains(NO_AGENT_BOT) || agent.contains(PERL_BOT) || agent.contains(APACHE_HHTP) || agent.contains(JAVA_CLIENT) || agent.contains(CURL)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void checkByRate() throws InterruptedException, IOException {
+ String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
+ if (processingType.equals("sequential")) {
+ checkByRateInSequential();
+ } else if (processingType.equals("parallel")) {
+ checkByRateInParallel();
+ }
+ }
+
+ /**
+ * Check crawler by request sending rate, which is read from configruation
+ * file
+ *
+ * @throws InterruptedException InterruptedException
+ * @throws IOException IOException
+ */
+ public void checkByRateInSequential() throws InterruptedException, IOException {
+ es.createBulkProcessor();
+
+ int rate = Integer.parseInt(props.getProperty("sendingrate"));
+
+ Terms users = this.getUserTerms(this.httpType);
+ LOG.info("Original User count: {}", Integer.toString(users.getBuckets().size()));
+
+ int userCount = 0;
+ for (Terms.Bucket entry : users.getBuckets()) {
+ String user = entry.getKey().toString();
+ int count = checkByRate(es, user);
+ userCount += count;
+ }
+ es.destroyBulkProcessor();
+ LOG.info("User count: {}", Integer.toString(userCount));
+ }
+
+ void checkByRateInParallel() throws InterruptedException, IOException {
+
+ JavaRDD<String> userRDD = getUserRDD(this.httpType);
+ LOG.info("Original User count: {}", userRDD.count());
+
+ int userCount = 0;
+ userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> {
+ ESDriver tmpES = new ESDriver(props);
+ tmpES.createBulkProcessor();
+ List<Integer> realUserNums = new ArrayList<>();
+ while (iterator.hasNext()) {
+ String s = iterator.next();
+ Integer realUser = checkByRate(tmpES, s);
+ realUserNums.add(realUser);
+ }
+ tmpES.destroyBulkProcessor();
+ tmpES.close();
+ return realUserNums.iterator();
+ }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
+
+ LOG.info("User count: {}", Integer.toString(userCount));
+ }
+
+ private int checkByRate(ESDriver es, String user) {
+
+ int rate = Integer.parseInt(props.getProperty("sendingrate"));
+ Pattern pattern = Pattern.compile("get (.*?) http/*");
+ Matcher matcher;
+
+ BoolQueryBuilder filterSearch = new BoolQueryBuilder();
+ filterSearch.must(QueryBuilders.termQuery("IP", user));
+
+ AggregationBuilder aggregation = AggregationBuilders.dateHistogram("by_minute").field("Time").dateHistogramInterval(DateHistogramInterval.MINUTE).order(Order.COUNT_DESC);
+ SearchResponse checkRobot = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setQuery(filterSearch).setSize(0).addAggregation(aggregation).execute().actionGet();
+
+ Histogram agg = checkRobot.getAggregations().get("by_minute");
+
+ List<? extends Histogram.Bucket> botList = agg.getBuckets();
+ long maxCount = botList.get(0).getDocCount();
+ if (maxCount >= rate) {
+ return 0;
+ } else {
+ DateTime dt1 = null;
+ int toLast = 0;
+ SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setScroll(new TimeValue(60000)).setQuery(filterSearch).setSize(100).execute().actionGet();
+ while (true) {
+ for (SearchHit hit : scrollResp.getHits().getHits()) {
+ Map<String, Object> result = hit.getSource();
+ String logtype = (String) result.get("LogType");
+ if (logtype.equals("PO.DAAC")) {
+ String request = (String) result.get("Request");
+ matcher = pattern.matcher(request.trim().toLowerCase());
+ boolean find = false;
+ while (matcher.find()) {
+ request = matcher.group(1);
+ result.put("RequestUrl", "http://podaac.jpl.nasa.gov" + request);
+ find = true;
+ }
+ if (!find) {
+ result.put("RequestUrl", request);
+ }
+ } else {
+ result.put("RequestUrl", result.get("Request"));
+ }
+
+ DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
+ DateTime dt2 = fmt.parseDateTime((String) result.get("Time"));
+
+ if (dt1 == null) {
+ toLast = 0;
+ } else {
+ toLast = Math.abs(Seconds.secondsBetween(dt1, dt2).getSeconds());
+ }
+ result.put("ToLast", toLast);
+ IndexRequest ir = new IndexRequest(logIndex, cleanupType).source(result);
+
+ es.getBulkProcessor().add(ir);
+ dt1 = dt2;
+ }
+
+ scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+ if (scrollResp.getHits().getHits().length == 0) {
+ break;
+ }
+ }
+
+ }
+
+ return 1;
+ }
+
+ @Override
+ public Object execute(Object o) {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java
new file mode 100644
index 0000000..d5dc102
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/weblog/pre/HistoryGenerator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.weblog.pre;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Supports ability to generate search history (queries) for each individual
+ * user (IP)
+ */
+public class HistoryGenerator extends LogAbstract {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(HistoryGenerator.class);
+
+ public HistoryGenerator(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ }
+
+ @Override
+ public Object execute() {
+ LOG.info("Starting HistoryGenerator...");
+ startTime = System.currentTimeMillis();
+
+ generateBinaryMatrix();
+
+ endTime = System.currentTimeMillis();
+ LOG.info("HistoryGenerator complete. Time elapsed {} seconds", (endTime - startTime) / 1000);
+ return null;
+ }
+
+ /**
+ * Method to generate a binary user*query matrix (stored in temporary .csv
+ * file)
+ */
+ public void generateBinaryMatrix() {
+ try {
+
+ System.out.println(props.getProperty("userHistoryMatrix"));
+ File file = new File(props.getProperty("userHistoryMatrix"));
+ if (file.exists()) {
+ file.delete();
+ }
+
+ file.createNewFile();
+
+ FileWriter fw = new FileWriter(file.getAbsoluteFile());
+ BufferedWriter bw = new BufferedWriter(fw);
+ bw.write("Num" + ",");
+
+ // step 1: write first row of csv
+ List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+ String[] logIndices = logIndexList.toArray(new String[0]);
+ String[] statictypeArray = new String[] { this.sessionStats };
+ int docCount = es.getDocCount(logIndices, statictypeArray);
+
+ SearchResponse sr = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
+ .addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet();
+ Terms ips = sr.getAggregations().get("IPs");
+ List<String> ipList = new ArrayList<>();
+ for (Terms.Bucket entry : ips.getBuckets()) {
+ if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) { // filter
+ // out
+ // less
+ // active users/ips
+ ipList.add(entry.getKey().toString());
+ }
+ }
+ bw.write(String.join(",", ipList) + "\n");
+
+ // step 2: step the rest rows of csv
+ SearchRequestBuilder sr2Builder = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
+ .addAggregation(AggregationBuilders.terms("KeywordAgg").field("keywords").size(docCount).subAggregation(AggregationBuilders.terms("IPAgg").field("IP").size(docCount)));
+
+ SearchResponse sr2 = sr2Builder.execute().actionGet();
+ Terms keywords = sr2.getAggregations().get("KeywordAgg");
+
+ for (Terms.Bucket keyword : keywords.getBuckets()) {
+
+ Map<String, Integer> ipMap = new HashMap<>();
+ Terms ipAgg = keyword.getAggregations().get("IPAgg");
+
+ int distinctUser = ipAgg.getBuckets().size();
+ if (distinctUser > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) {
+ bw.write(keyword.getKey() + ",");
+ for (Terms.Bucket IP : ipAgg.getBuckets()) {
+
+ ipMap.put(IP.getKey().toString(), 1);
+ }
+ for (int i = 0; i < ipList.size(); i++) {
+ if (ipMap.containsKey(ipList.get(i))) {
+ bw.write(ipMap.get(ipList.get(i)) + ",");
+ } else {
+ bw.write("0,");
+ }
+ }
+ bw.write("\n");
+ }
+ }
+
+ bw.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public Object execute(Object o) {
+ return null;
+ }
+
+}