You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:35:01 UTC
[20/21] incubator-sdap-mudrod git commit: SDAP-1 Import all code
under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
new file mode 100644
index 0000000..c63d005
--- /dev/null
+++ b/core/pom.xml
@@ -0,0 +1,324 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0 Unless
+
+ required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS"
+ BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ express or implied. See the License for the specific language
+ governing permissions and limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>gov.nasa.jpl.mudrod</groupId>
+ <artifactId>mudrod-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <artifactId>mudrod-core</artifactId>
+
+ <name>Mudrod :: Core</name>
+ <description>Core Mudrod library implementation.</description>
+
+ <properties>
+ <!-- This property is the name of the directory containing the SVM/SGD Model. It is used during build to create a zip file from this directory -->
+ <svmSgdModel.value>javaSVMWithSGDModel</svmSgdModel.value>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jdom</groupId>
+ <artifactId>jdom</artifactId>
+ </dependency>
+
+ <!-- Elasticsearch dependencies -->
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.elasticsearch.plugin</groupId>
+ <artifactId>transport-netty4-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch-spark-20_2.11</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.carrotsearch</groupId>
+ <artifactId>hppc</artifactId>
+ </dependency>
+ <!-- End of Elasticsearch dependencies -->
+
+ <!-- Spark dependencies -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-mllib_2.11</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fommil.netlib</groupId>
+ <artifactId>all</artifactId>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+
+ <dependency>
+ <!-- Needed to run the Spark UI during log ingestion -->
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ <version>3.1.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <!-- End of Spark dependencies -->
+
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ <version>1.3.8</version>
+ </dependency>
+
+ <!-- Logging dependencies -->
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </dependency>
+ <!-- End of Logging dependencies -->
+
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>net.sf.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>2.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.jena</groupId>
+ <artifactId>jena-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ <filtering>true</filtering>
+ <excludes>
+ <exclude>${svmSgdModel.value}/**</exclude>
+ </excludes>
+ </resource>
+
+ <resource>
+ <directory>${project.build.directory}</directory>
+ <includes>
+ <include>${svmSgdModel.value}.zip</include>
+ </includes>
+ </resource>
+
+ <resource>
+ <directory>${basedir}/../</directory>
+ <targetPath>META-INF</targetPath>
+ <includes>
+ <include>LICENSE.txt</include>
+ <include>NOTICE.txt</include>
+ </includes>
+ </resource>
+ </resources>
+
+ <plugins>
+ <!-- generates the bin launchers -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.10</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ <programs>
+ <program>
+ <mainClass>gov.nasa.jpl.mudrod.main.MudrodEngine
+ </mainClass>
+ <name>mudrod-engine</name>
+ </program>
+ </programs>
+ </configuration>
+ </plugin>
+
+ <!-- Generates the distribution package -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>zipSVMWithSGDModel</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <appendAssemblyId>false</appendAssemblyId>
+ <tarLongFileMode>posix</tarLongFileMode>
+ <finalName>${svmSgdModel.value}</finalName>
+ <outputDirectory>${project.build.directory}
+ </outputDirectory>
+ <descriptors>
+ <descriptor>
+ ${basedir}/src/main/assembly/zipSVMWithSGDModel.xml
+ </descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+
+ <execution>
+ <id>generateDistribution</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <appendAssemblyId>false</appendAssemblyId>
+ <tarLongFileMode>posix</tarLongFileMode>
+ <descriptors>
+ <descriptor>
+ ${basedir}/src/main/assembly/bin.xml
+ </descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.0.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <manifestEntries>
+ <Main-Class>
+ gov.nasa.jpl.mudrod.main.MudrodEngine
+ </Main-Class>
+ <Build-Number>${implementation.build}
+ </Build-Number>
+ </manifestEntries>
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <finalName>
+ ${project.artifactId}-uber-${project.version}
+ </finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>release</id>
+ <build>
+ <resources>
+ <resource>
+ <directory>${basedir}/../</directory>
+ <targetPath>
+ ${project.build.directory}/apidocs/META-INF
+ </targetPath>
+ <includes>
+ <include>LICENSE.txt</include>
+ <include>NOTICE.txt</include>
+ </includes>
+ </resource>
+ </resources>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/core/src/main/assembly/bin.xml b/core/src/main/assembly/bin.xml
new file mode 100644
index 0000000..7977d65
--- /dev/null
+++ b/core/src/main/assembly/bin.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0 Unless
+
+ required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS"
+ BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ express or implied. See the License for the specific language
+ governing permissions and limitations under the License.
+-->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>bin</id>
+ <formats>
+ <format>tar.gz</format>
+ <format>zip</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <excludes>
+ <exclude>*.bat</exclude>
+ </excludes>
+ <lineEnding>unix</lineEnding>
+ <directoryMode>0755</directoryMode>
+ <fileMode>0644</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <includes>
+ <include>*.bat</include>
+ </includes>
+ <lineEnding>dos</lineEnding>
+ <directoryMode>0755</directoryMode>
+ <fileMode>0644</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/assembly/zipSVMWithSGDModel.xml
----------------------------------------------------------------------
diff --git a/core/src/main/assembly/zipSVMWithSGDModel.xml b/core/src/main/assembly/zipSVMWithSGDModel.xml
new file mode 100644
index 0000000..aec117f
--- /dev/null
+++ b/core/src/main/assembly/zipSVMWithSGDModel.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0 Unless
+
+ required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS"
+ BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ express or implied. See the License for the specific language
+ governing permissions and limitations under the License.
+-->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>zipSVMWithSGDModel</id>
+ <baseDirectory>${svmSgdModel.value}</baseDirectory>
+ <formats>
+ <format>zip</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/${svmSgdModel.value}</directory>
+ <outputDirectory>.</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryEngineAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryEngineAbstract.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryEngineAbstract.java
new file mode 100644
index 0000000..2de9cf6
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryEngineAbstract.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+public abstract class DiscoveryEngineAbstract extends MudrodAbstract implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public DiscoveryEngineAbstract(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ }
+
+ /**
+ * Abstract method of preprocess
+ */
+ public abstract void preprocess();
+
+ /**
+ * Abstract method of process
+ */
+ public abstract void process();
+
+ /**
+ * Abstract method of output
+ */
+ public abstract void output();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryStepAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryStepAbstract.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryStepAbstract.java
new file mode 100644
index 0000000..94e8094
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryStepAbstract.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+
+import java.util.Properties;
+
+/*
+ * Generic class of discovery engine step
+ */
+public abstract class DiscoveryStepAbstract extends MudrodAbstract {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public DiscoveryStepAbstract(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ }
+
+ /**
+ * Abstract class of step execution without parameter
+ *
+ * @return An instance of Object
+ */
+ public abstract Object execute();
+
+ /**
+ * Abstract class of step execution with parameter
+ *
+ * @param o an instance of object
+ * @return An instance of object
+ */
+ public abstract Object execute(Object o);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MetadataDiscoveryEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MetadataDiscoveryEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MetadataDiscoveryEngine.java
new file mode 100644
index 0000000..e994825
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MetadataDiscoveryEngine.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.metadata.pre.ApiHarvester;
+import gov.nasa.jpl.mudrod.metadata.pre.MatrixGenerator;
+import gov.nasa.jpl.mudrod.metadata.process.MetadataAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Supports to preprocess and process metadata
+ */
+public class MetadataDiscoveryEngine extends DiscoveryEngineAbstract implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataDiscoveryEngine.class);
+
+ public MetadataDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ }
+
+ /**
+ * Method of preprocessing metadata
+ */
+ public void preprocess() {
+ LOG.info("Starting metadata preprocessing...");
+ startTime = System.currentTimeMillis();
+
+ DiscoveryStepAbstract harvester = new ApiHarvester(this.props, this.es, this.spark);
+ harvester.execute();
+
+ endTime = System.currentTimeMillis();
+ LOG.info("Finished metadata preprocessing. Time elapsed: {}s", (endTime - startTime) / 1000);
+ }
+
+ /**
+ * Method of processing metadata
+ */
+ public void process() {
+ LOG.info("Starting metadata processing...");
+ startTime = System.currentTimeMillis();
+
+ DiscoveryStepAbstract matrix = new MatrixGenerator(this.props, this.es, this.spark);
+ matrix.execute();
+
+ DiscoveryStepAbstract svd = new MetadataAnalyzer(this.props, this.es, this.spark);
+ svd.execute();
+
+ endTime = System.currentTimeMillis();
+ LOG.info("Finished metadata processing. Time elapsed: {}s", (endTime - startTime) / 1000);
+ }
+
+ public void output() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MudrodAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MudrodAbstract.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MudrodAbstract.java
new file mode 100644
index 0000000..b3f1831
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MudrodAbstract.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import org.apache.commons.io.IOUtils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckForNull;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * This is the most generic class of Mudrod
+ */
+public abstract class MudrodAbstract implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MudrodAbstract.class);
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ protected Properties props = new Properties();
+ protected ESDriver es = null;
+ protected SparkDriver spark = null;
+ protected long startTime;
+ protected long endTime;
+
+ protected static final String ES_SETTINGS = "elastic_settings.json";
+ protected static final String ES_MAPPINGS = "elastic_mappings.json";
+
+ public MudrodAbstract(Properties props, ESDriver es, SparkDriver spark) {
+ this.props = props;
+ this.es = es;
+ this.spark = spark;
+
+ if (this.props != null) {
+ this.initMudrod();
+ }
+ }
+
+ /**
+ * Method of setting up essential configuration for MUDROD to start
+ */
+ @CheckForNull
+ protected void initMudrod() {
+ InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS);
+ InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS);
+ JSONObject settingsJSON = null;
+ JSONObject mappingJSON = null;
+
+ try {
+ settingsJSON = new JSONObject(IOUtils.toString(settingsStream));
+ } catch (JSONException | IOException e1) {
+ LOG.error("Error reading Elasticsearch settings!", e1);
+ }
+
+ try {
+ mappingJSON = new JSONObject(IOUtils.toString(mappingsStream));
+ } catch (JSONException | IOException e1) {
+ LOG.error("Error reading Elasticsearch mappings!", e1);
+ }
+
+ try {
+ if (settingsJSON != null && mappingJSON != null) {
+ this.es.putMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME), settingsJSON.toString(), mappingJSON.toString());
+ }
+ } catch (IOException e) {
+ LOG.error("Error entering Elasticsearch Mappings!", e);
+ }
+ }
+
+ /**
+ * Get driver of Elasticsearch
+ *
+ * @return driver of Elasticsearch
+ */
+ public ESDriver getES() {
+ return this.es;
+ }
+
+ /**
+ * Get configuration of MUDROD (read from configuration file)
+ *
+ * @return configuration of MUDROD
+ */
+ public Properties getConfig() {
+ return this.props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/OntologyDiscoveryEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/OntologyDiscoveryEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/OntologyDiscoveryEngine.java
new file mode 100644
index 0000000..fe8066f
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/OntologyDiscoveryEngine.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.ontology.pre.AggregateTriples;
+import gov.nasa.jpl.mudrod.ontology.process.OntologyLinkCal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Supports to preprocess and process ontology
+ */
+public class OntologyDiscoveryEngine extends DiscoveryEngineAbstract {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(OntologyDiscoveryEngine.class);
+
+ public OntologyDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ }
+
+ /**
+ * Method of preprocessing ontology
+ */
+ public void preprocess() {
+ LOG.info("*****************Ontology preprocessing starts******************");
+ startTime = System.currentTimeMillis();
+
+ DiscoveryStepAbstract at = new AggregateTriples(this.props, this.es, this.spark);
+ at.execute();
+
+ endTime = System.currentTimeMillis();
+ LOG.info("*****************Ontology preprocessing ends******************Took {}s", (endTime - startTime) / 1000);
+ }
+
+ /**
+ * Method of processing ontology
+ */
+ public void process() {
+ LOG.info("*****************Ontology processing starts******************");
+ startTime = System.currentTimeMillis();
+
+ DiscoveryStepAbstract ol = new OntologyLinkCal(this.props, this.es, this.spark);
+ ol.execute();
+
+ endTime = System.currentTimeMillis();
+ LOG.info("*****************Ontology processing ends******************Took {}s", (endTime - startTime) / 1000);
+ }
+
+ public void output() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/RecommendEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/RecommendEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/RecommendEngine.java
new file mode 100644
index 0000000..9805b79
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/RecommendEngine.java
@@ -0,0 +1,77 @@
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.recommendation.pre.ImportMetadata;
+import gov.nasa.jpl.mudrod.recommendation.pre.MetadataTFIDFGenerator;
+import gov.nasa.jpl.mudrod.recommendation.pre.NormalizeVariables;
+import gov.nasa.jpl.mudrod.recommendation.pre.SessionCooccurence;
+import gov.nasa.jpl.mudrod.recommendation.process.AbstractBasedSimilarity;
+import gov.nasa.jpl.mudrod.recommendation.process.VariableBasedSimilarity;
+import gov.nasa.jpl.mudrod.recommendation.process.sessionBasedCF;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class RecommendEngine extends DiscoveryEngineAbstract {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(RecommendEngine.class);
+
+ public RecommendEngine(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ LOG.info("Started Mudrod Recommend Engine.");
+ }
+
+ @Override
+ public void preprocess() {
+ LOG.info("*****************Recommendation preprocessing starts******************");
+
+ startTime = System.currentTimeMillis();
+
+ DiscoveryStepAbstract harvester = new ImportMetadata(this.props, this.es, this.spark);
+ harvester.execute();
+
+ DiscoveryStepAbstract tfidf = new MetadataTFIDFGenerator(this.props, this.es, this.spark);
+ tfidf.execute();
+
+ DiscoveryStepAbstract sessionMatrixGen = new SessionCooccurence(this.props, this.es, this.spark);
+ sessionMatrixGen.execute();
+
+ DiscoveryStepAbstract transformer = new NormalizeVariables(this.props, this.es, this.spark);
+ transformer.execute();
+
+ endTime = System.currentTimeMillis();
+
+ LOG.info("*****************Recommendation preprocessing ends******************Took {}s {}", (endTime - startTime) / 1000);
+ }
+
+ @Override
+ public void process() {
+ // TODO Auto-generated method stub
+ LOG.info("*****************Recommendation processing starts******************");
+
+ startTime = System.currentTimeMillis();
+
+ DiscoveryStepAbstract tfCF = new AbstractBasedSimilarity(this.props, this.es, this.spark);
+ tfCF.execute();
+
+ DiscoveryStepAbstract cbCF = new VariableBasedSimilarity(this.props, this.es, this.spark);
+ cbCF.execute();
+
+ DiscoveryStepAbstract sbCF = new sessionBasedCF(this.props, this.es, this.spark);
+ sbCF.execute();
+
+ endTime = System.currentTimeMillis();
+
+ LOG.info("*****************Recommendation processing ends******************Took {}s {}", (endTime - startTime) / 1000);
+ }
+
+ @Override
+ public void output() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/WeblogDiscoveryEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/WeblogDiscoveryEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/WeblogDiscoveryEngine.java
new file mode 100644
index 0000000..4657222
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/WeblogDiscoveryEngine.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import gov.nasa.jpl.mudrod.weblog.pre.*;
+import gov.nasa.jpl.mudrod.weblog.process.ClickStreamAnalyzer;
+import gov.nasa.jpl.mudrod.weblog.process.UserHistoryAnalyzer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Supports to preprocess and process web log
+ */
+public class WeblogDiscoveryEngine extends DiscoveryEngineAbstract {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(WeblogDiscoveryEngine.class);
+ public String timeSuffix = null;
+
+ public WeblogDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ LOG.info("Started Mudrod Weblog Discovery Engine.");
+ }
+
+ /**
+ * Get log file list from a directory
+ *
+ * @param logDir path to directory containing logs either local or in HDFS.
+ * @return a list of log files
+ */
+ public List<String> getFileList(String logDir) {
+
+ ArrayList<String> inputList = new ArrayList<>();
+ if (!logDir.startsWith("hdfs://")) {
+ File directory = new File(logDir);
+ File[] fList = directory.listFiles();
+ for (File file : fList) {
+ if (file.isFile() && file.getName().matches(".*\\d+.*") && file.getName().contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) {
+ inputList.add(file.getName().replace(props.getProperty(MudrodConstants.HTTP_PREFIX), ""));
+ }
+ }
+ } else {
+ Configuration conf = new Configuration();
+ try (FileSystem fs = FileSystem.get(new URI(logDir), conf)) {
+ FileStatus[] fileStatus;
+ fileStatus = fs.listStatus(new Path(logDir));
+ for (FileStatus status : fileStatus) {
+ String path1 = status.getPath().toString();
+ if (path1.matches(".*\\d+.*") && path1.contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) {
+
+ String time = path1.substring(path1.lastIndexOf('.') + 1);
+ inputList.add(time);
+ }
+ }
+ } catch (IllegalArgumentException | IOException | URISyntaxException e) {
+ LOG.error("An error occured whilst obtaining the log file list.", e);
+ }
+ }
+
+ return inputList;
+ }
+
+ /**
+ * Method of preprocessing web logs, generating vocab similarity based on web
+ * logs
+ */
+ @Override
+ public void preprocess() {
+ LOG.info("Starting Web log preprocessing.");
+
+ ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR));
+
+ for (int i = 0; i < inputList.size(); i++) {
+ timeSuffix = inputList.get(i);
+ props.put(MudrodConstants.TIME_SUFFIX, timeSuffix);
+ startTime = System.currentTimeMillis();
+ LOG.info("Processing logs dated {}", inputList.get(i));
+
+ DiscoveryStepAbstract im = new ImportLogFile(this.props, this.es, this.spark);
+ im.execute();
+
+ DiscoveryStepAbstract cd = new CrawlerDetection(this.props, this.es, this.spark);
+ cd.execute();
+
+ DiscoveryStepAbstract sg = new SessionGenerator(this.props, this.es, this.spark);
+ sg.execute();
+
+ DiscoveryStepAbstract ss = new SessionStatistic(this.props, this.es, this.spark);
+ ss.execute();
+
+ DiscoveryStepAbstract rr = new RemoveRawLog(this.props, this.es, this.spark);
+ rr.execute();
+
+ endTime = System.currentTimeMillis();
+
+ LOG.info("Web log preprocessing for logs dated {} complete. Time elapsed {} seconds.", inputList.get(i), (endTime - startTime) / 1000);
+ }
+
+ DiscoveryStepAbstract hg = new HistoryGenerator(this.props, this.es, this.spark);
+ hg.execute();
+
+ DiscoveryStepAbstract cg = new ClickStreamGenerator(this.props, this.es, this.spark);
+ cg.execute();
+
+ LOG.info("Web log preprocessing (user history and clickstream) complete.");
+ }
+
+ /**
+ * Method of web log ingest
+ */
+ public void logIngest() {
+ LOG.info("Starting Web log ingest.");
+ ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR));
+ for (int i = 0; i < inputList.size(); i++) {
+ timeSuffix = inputList.get(i);
+ props.put("TimeSuffix", timeSuffix);
+ DiscoveryStepAbstract im = new ImportLogFile(this.props, this.es, this.spark);
+ im.execute();
+ }
+
+ LOG.info("Web log ingest complete.");
+
+ }
+
+ /**
+ * Method of reconstructing user sessions from raw web logs
+ */
+ public void sessionRestruct() {
+ LOG.info("Starting Session reconstruction.");
+ ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR));
+ for (int i = 0; i < inputList.size(); i++) {
+ timeSuffix = inputList.get(i); // change timeSuffix dynamically
+ props.put(MudrodConstants.TIME_SUFFIX, timeSuffix);
+ DiscoveryStepAbstract cd = new CrawlerDetection(this.props, this.es, this.spark);
+ cd.execute();
+
+ DiscoveryStepAbstract sg = new SessionGenerator(this.props, this.es, this.spark);
+ sg.execute();
+
+ DiscoveryStepAbstract ss = new SessionStatistic(this.props, this.es, this.spark);
+ ss.execute();
+
+ DiscoveryStepAbstract rr = new RemoveRawLog(this.props, this.es, this.spark);
+ rr.execute();
+
+ endTime = System.currentTimeMillis();
+ }
+ LOG.info("Session reconstruction complete.");
+ }
+
+ @Override
+ public void process() {
+ LOG.info("Starting Web log processing.");
+ startTime = System.currentTimeMillis();
+
+ DiscoveryStepAbstract svd = new ClickStreamAnalyzer(this.props, this.es, this.spark);
+ svd.execute();
+
+ DiscoveryStepAbstract ua = new UserHistoryAnalyzer(this.props, this.es, this.spark);
+ ua.execute();
+
+ endTime = System.currentTimeMillis();
+ LOG.info("Web log processing complete. Time elaspsed {} seconds.", (endTime - startTime) / 1000);
+ }
+
+ @Override
+ public void output() {
+ // not implemented yet!
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/package-info.java
new file mode 100644
index 0000000..21e528e
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes abstract classes of MUDROD, discovery step, and engine.
+ * Workflow classes such as weblogDiscoveryEngine, OntologyDiscoveryEngine, and
+ * MetadataDiscoveryEngine are also included here.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
new file mode 100644
index 0000000..d1af04a
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.driver;
+
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import com.google.gson.GsonBuilder;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import gov.nasa.jpl.mudrod.main.MudrodEngine;
+import gov.nasa.jpl.mudrod.utils.ESTransportClient;
+import org.apache.commons.lang.StringUtils;
+import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
+import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.Fuzziness;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.search.suggest.SuggestBuilder;
+import org.elasticsearch.search.suggest.SuggestBuilders;
+import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * Driver implementation for all Elasticsearch functionality.
+ */
+public class ESDriver implements Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ESDriver.class);
+ private static final long serialVersionUID = 1L;
+ private transient Client client = null;
+ private transient Node node = null;
+ private transient BulkProcessor bulkProcessor = null;
+
+ /**
+ * Default constructor for this class. To load client configuration call
+ * substantiated constructor.
+ */
+ public ESDriver() {
+ // Default constructor, to load configuration call ESDriver(props)
+ }
+
+ /**
+ * Substantiated constructor which accepts a {@link java.util.Properties}
+ *
+ * @param props a populated properties object.
+ */
+ public ESDriver(Properties props) {
+ try {
+ setClient(makeClient(props));
+ } catch (IOException e) {
+ LOG.error("Error whilst constructing Elastcisearch client.", e);
+ }
+ }
+
+ public void createBulkProcessor() {
+ LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", 1000, 2500500);
+ setBulkProcessor(BulkProcessor.builder(getClient(), new BulkProcessor.Listener() {
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ LOG.debug("ESDriver#createBulkProcessor @Override #beforeBulk is not implemented yet!");
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ LOG.debug("ESDriver#createBulkProcessor @Override #afterBulk is not implemented yet!");
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ LOG.error("Bulk request has failed!");
+ throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure);
+ }
+ }).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1)
+ .build());
+ }
+
+ public void destroyBulkProcessor() {
+ try {
+ getBulkProcessor().awaitClose(20, TimeUnit.MINUTES);
+ setBulkProcessor(null);
+ refreshIndex();
+ } catch (InterruptedException e) {
+ LOG.error("Error destroying the Bulk Processor.", e);
+ }
+ }
+
+ public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException {
+
+ boolean exists = getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists();
+ if (exists) {
+ return;
+ }
+
+ getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet();
+ getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet();
+ }
+
+ public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException {
+ return this.customAnalyzing(indexName, "cody", str);
+ }
+
+ public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException {
+ String[] strList = str.toLowerCase().split(",");
+ for (int i = 0; i < strList.length; i++) {
+ String tmp = "";
+ AnalyzeResponse r = client.admin().indices().prepareAnalyze(strList[i]).setIndex(indexName).setAnalyzer(analyzer).execute().get();
+ for (AnalyzeToken token : r.getTokens()) {
+ tmp += token.getTerm() + " ";
+ }
+ strList[i] = tmp.trim();
+ }
+ return String.join(",", strList);
+ }
+
+ public List<String> customAnalyzing(String indexName, List<String> list) throws InterruptedException, ExecutionException {
+ if (list == null) {
+ return list;
+ }
+ int size = list.size();
+ List<String> customlist = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ customlist.add(this.customAnalyzing(indexName, list.get(i)));
+ }
+
+ return customlist;
+ }
+
+ public void deleteAllByQuery(String index, String type, QueryBuilder query) {
+ createBulkProcessor();
+ SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new TimeValue(60000)).setQuery(query).setSize(10000).execute()
+ .actionGet();
+
+ while (true) {
+ for (SearchHit hit : scrollResp.getHits().getHits()) {
+ DeleteRequest deleteRequest = new DeleteRequest(index, type, hit.getId());
+ getBulkProcessor().add(deleteRequest);
+ }
+
+ scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+ if (scrollResp.getHits().getHits().length == 0) {
+ break;
+ }
+
+ }
+ destroyBulkProcessor();
+ }
+
+ public void deleteType(String index, String type) {
+ this.deleteAllByQuery(index, type, QueryBuilders.matchAllQuery());
+ }
+
+ public List<String> getTypeListWithPrefix(Object object, Object object2) {
+ ArrayList<String> typeList = new ArrayList<>();
+ GetMappingsResponse res;
+ try {
+ res = getClient().admin().indices().getMappings(new GetMappingsRequest().indices(object.toString())).get();
+ ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(object.toString());
+ for (ObjectObjectCursor<String, MappingMetaData> c : mapping) {
+ if (c.key.startsWith(object2.toString())) {
+ typeList.add(c.key);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error whilst obtaining type list from Elasticsearch mappings.", e);
+ }
+ return typeList;
+ }
+
+ public List<String> getIndexListWithPrefix(Object object) {
+
+ LOG.info("Retrieving index list with prefix: {}", object.toString());
+ String[] indices = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices();
+
+ ArrayList<String> indexList = new ArrayList<>();
+ int length = indices.length;
+ for (int i = 0; i < length; i++) {
+ String indexName = indices[i];
+ if (indexName.startsWith(object.toString())) {
+ indexList.add(indexName);
+ }
+ }
+
+ return indexList;
+ }
+
+ public String searchByQuery(String index, String type, String query) throws IOException, InterruptedException, ExecutionException {
+ return searchByQuery(index, type, query, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ public String searchByQuery(String index, String type, String query, Boolean bDetail) throws IOException, InterruptedException, ExecutionException {
+ boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
+ if (!exists) {
+ return null;
+ }
+
+ QueryBuilder qb = QueryBuilders.queryStringQuery(query);
+ SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet();
+
+ // Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same.
+ Map<String, String> fieldsToReturn = new HashMap<>();
+
+ fieldsToReturn.put("Dataset-ShortName", "Short Name");
+ fieldsToReturn.put("Dataset-LongName", "Long Name");
+ fieldsToReturn.put("DatasetParameter-Topic", "Topic");
+ fieldsToReturn.put("Dataset-Description", "Dataset-Description");
+ fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date");
+
+ if (bDetail) {
+ fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat");
+ fieldsToReturn.put("Dataset-Doi", "Dataset-Doi");
+ fieldsToReturn.put("Dataset-ProcessingLevel", "Processing Level");
+ fieldsToReturn.put("DatasetCitation-Version", "Version");
+ fieldsToReturn.put("DatasetSource-Sensor-ShortName", "DatasetSource-Sensor-ShortName");
+ fieldsToReturn.put("DatasetProject-Project-ShortName", "DatasetProject-Project-ShortName");
+ fieldsToReturn.put("DatasetParameter-Category", "DatasetParameter-Category");
+ fieldsToReturn.put("DatasetLocationPolicy-BasePath", "DatasetLocationPolicy-BasePath");
+ fieldsToReturn.put("DatasetParameter-Variable-Full", "DatasetParameter-Variable-Full");
+ fieldsToReturn.put("DatasetParameter-Term-Full", "DatasetParameter-Term-Full");
+ fieldsToReturn.put("DatasetParameter-VariableDetail", "DatasetParameter-VariableDetail");
+
+ fieldsToReturn.put("DatasetRegion-Region", "Region");
+ fieldsToReturn.put("DatasetCoverage-NorthLat", "NorthLat");
+ fieldsToReturn.put("DatasetCoverage-SouthLat", "SouthLat");
+ fieldsToReturn.put("DatasetCoverage-WestLon", "WestLon");
+ fieldsToReturn.put("DatasetCoverage-EastLon", "EastLon");
+ fieldsToReturn.put("DatasetCoverage-StartTimeLong-Long", "DatasetCoverage-StartTimeLong-Long");
+ fieldsToReturn.put("Dataset-DatasetCoverage-StopTimeLong", "Dataset-DatasetCoverage-StopTimeLong");
+
+ fieldsToReturn.put("Dataset-TemporalResolution", "Dataset-TemporalResolution");
+ fieldsToReturn.put("Dataset-TemporalRepeat", "Dataset-TemporalRepeat");
+ fieldsToReturn.put("Dataset-LatitudeResolution", "Dataset-LatitudeResolution");
+ fieldsToReturn.put("Dataset-LongitudeResolution", "Dataset-LongitudeResolution");
+ fieldsToReturn.put("Dataset-AcrossTrackResolution", "Dataset-AcrossTrackResolution");
+ fieldsToReturn.put("Dataset-AlongTrackResolution", "Dataset-AlongTrackResolution");
+ }
+
+ List<Map<String, Object>> searchResults = new ArrayList<>();
+
+ for (SearchHit hit : response.getHits().getHits()) {
+ Map<String, Object> source = hit.getSource();
+
+ Map<String, Object> searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey()))
+ .collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue));
+
+ // searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result
+
+ // Some results require special handling/formatting:
+ // Release Date formatting
+ LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList<String>) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate();
+ searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE));
+
+ if (bDetail) {
+
+ // DataFormat value, translate RAW to BINARY
+ if ("RAW".equals(searchResult.get("DataFormat"))) {
+ searchResult.put("DataFormat", "BINARY");
+ }
+
+ // DatasetLocationPolicy-BasePath Should only contain ftp, http, or https URLs
+ List<String> urls = ((List<String>) searchResult.get("DatasetLocationPolicy-BasePath")).stream().filter(url -> url.startsWith("ftp") || url.startsWith("http")).collect(Collectors.toList());
+ searchResult.put("DatasetLocationPolicy-BasePath", urls);
+
+ // Time Span Formatting
+ LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate();
+ LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ?
+ null :
+ Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate();
+ searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE)));
+
+ // Temporal resolution can come from one of two fields
+ searchResult.put("TemporalResolution", "".equals(searchResult.get("Dataset-TemporalResolution")) ? searchResult.get("Dataset-TemporalRepeat") : searchResult.get("Dataset-TemporalResolution"));
+
+ // Special formatting for spatial resolution
+ String latResolution = (String) searchResult.get("Dataset-LatitudeResolution");
+ String lonResolution = (String) searchResult.get("Dataset-LongitudeResolution");
+ if (!latResolution.isEmpty() && !lonResolution.isEmpty()) {
+ searchResult.put("SpatialResolution", latResolution + " degrees (latitude) x " + lonResolution + " degrees (longitude)");
+ } else {
+ String acrossResolution = (String) searchResult.get("Dataset-AcrossTrackResolution");
+ String alonResolution = (String) searchResult.get("Dataset-AlongTrackResolution");
+ double dAcrossResolution = Double.parseDouble(acrossResolution) / 1000;
+ double dAlonResolution = Double.parseDouble(alonResolution) / 1000;
+ searchResult.put("SpatialResolution", dAlonResolution + " km (Along) x " + dAcrossResolution + " km (Across)");
+ }
+
+ // Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies.
+ List<List<String>> measurements = buildMeasurementHierarchies((List<String>) searchResult.get("Topic"), (List<String>) searchResult.get("DatasetParameter-Term-Full"),
+ (List<String>) searchResult.get("DatasetParameter-Variable-Full"), (List<String>) searchResult.get("DatasetParameter-VariableDetail"));
+
+ searchResult.put("Measurements", measurements);
+
+ }
+
+ searchResults.add(searchResult);
+ }
+
+ Map<String, List<?>> pdResults = new HashMap<>();
+ pdResults.put("PDResults", searchResults);
+
+ return new GsonBuilder().create().toJson(pdResults);
+ }
+
+ /**
+ * Builds a List of Measurement Hierarchies given the individual source lists.
+ * The hierarchy is built from the element in the same position from each input list in the order: Topic -> Term -> Variable -> VariableDetail
+ * "None" and blank strings are ignored. If, at any level, an element does not exist for that position or it is "None" or blank, that hierarchy is considered complete.
+ *
+ * For example, if the input is:
+ * <pre>
+ * topics = ["Oceans", "Oceans"]
+ * terms = ["Sea Surface Topography", "Ocean Waves"]
+ * variables = ["Sea Surface Height", "Significant Wave Height"]
+ * variableDetails = ["None", "None"]
+ * </pre>
+ *
+ * The output would be:
+ * <pre>
+ * [
+ * ["Oceans", "Sea Surface Topography", "Sea Surface Height"],
+ * ["Oceans", "Ocean Waves", "Significant Wave Height"]
+ * ]
+ * </pre>
+ * Oceans > Sea Surface Topography > Sea Surface Height
+ * Oceans > Ocean Waves > Significant Wave Height
+ *
+ * @param topics List of topics, the first element of a measurement
+ * @param terms List of terms, the second element of a measurement
+ * @param variables List of variables, the third element of a measurement
+ * @param variableDetails List of variable details, the fourth element of a measurement
+ *
+ * @return A List where each element is a single hierarchy (as a List) built from the provided input lists.
+ */
+ private List<List<String>> buildMeasurementHierarchies(List<String> topics, List<String> terms, List<String> variables, List<String> variableDetails) {
+
+ List<List<String>> measurements = new ArrayList<>();
+
+ for (int x = 0; x < topics.size(); x++) {
+ measurements.add(new ArrayList<>());
+ measurements.get(x).add(topics.get(x));
+ // Only add the next 'level' if we can
+ if (x < terms.size() && !"None".equalsIgnoreCase(terms.get(x)) && StringUtils.isNotBlank(terms.get(x))) {
+ measurements.get(x).add(terms.get(x));
+ if (x < variables.size() && !"None".equalsIgnoreCase(variables.get(x)) && StringUtils.isNotBlank(variables.get(x))) {
+ measurements.get(x).add(variables.get(x));
+ if (x < variableDetails.size() && !"None".equalsIgnoreCase(variableDetails.get(x)) && StringUtils.isNotBlank(variableDetails.get(x))) {
+ measurements.get(x).add(variableDetails.get(x));
+ }
+ }
+ }
+ }
+
+ return measurements;
+
+ }
+
+ public List<String> autoComplete(String index, String term) {
+ boolean exists = this.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
+ if (!exists) {
+ return new ArrayList<>();
+ }
+
+ Set<String> suggestHS = new HashSet<String>();
+ List<String> suggestList = new ArrayList<>();
+
+ // please make sure that the completion field is configured in the ES mapping
+ CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100);
+ SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder));
+ SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet();
+
+ Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator();
+
+ while (iterator.hasNext()) {
+ Suggest.Suggestion.Entry.Option next = iterator.next();
+ String suggest = next.getText().string().toLowerCase();
+ suggestList.add(suggest);
+ }
+
+ suggestHS.addAll(suggestList);
+ suggestList.clear();
+ suggestList.addAll(suggestHS);
+ return suggestList;
+ }
+
+ public void close() {
+ client.close();
+ }
+
+ public void refreshIndex() {
+ client.admin().indices().prepareRefresh().execute().actionGet();
+ }
+
+ /**
+ * Generates a TransportClient or NodeClient
+ *
+ * @param props a populated {@link java.util.Properties} object
+ * @return a constructed {@link org.elasticsearch.client.Client}
+ * @throws IOException if there is an error building the
+ * {@link org.elasticsearch.client.Client}
+ */
+ protected Client makeClient(Properties props) throws IOException {
+ String clusterName = props.getProperty(MudrodConstants.ES_CLUSTER);
+ String hostsString = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
+ String[] hosts = hostsString.split(",");
+ String portStr = props.getProperty(MudrodConstants.ES_TRANSPORT_TCP_PORT);
+ int port = Integer.parseInt(portStr);
+
+ Settings.Builder settingsBuilder = Settings.builder();
+
+ // Set the cluster name and build the settings
+ if (!clusterName.isEmpty())
+ settingsBuilder.put("cluster.name", clusterName);
+
+ settingsBuilder.put("http.type", "netty3");
+ settingsBuilder.put("transport.type", "netty3");
+
+ Settings settings = settingsBuilder.build();
+
+ Client client = null;
+
+ // Prefer TransportClient
+ if (hosts != null && port > 1) {
+ TransportClient transportClient = new ESTransportClient(settings);
+ for (String host : hosts)
+ transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
+ client = transportClient;
+ } else if (clusterName != null) {
+ node = new Node(settings);
+ client = node.client();
+ }
+
+ return client;
+ }
+
+ /**
+ * Main method used to invoke the ESDriver implementation.
+ *
+ * @param args no arguments are required to invoke the Driver.
+ */
+ public static void main(String[] args) {
+ MudrodEngine mudrodEngine = new MudrodEngine();
+ ESDriver es = new ESDriver(mudrodEngine.loadConfig());
+ es.getTypeListWithPrefix("podaacsession", "sessionstats");
+ }
+
+ /**
+ * @return the client
+ */
+ public Client getClient() {
+ return client;
+ }
+
+ /**
+ * @param client the client to set
+ */
+ public void setClient(Client client) {
+ this.client = client;
+ }
+
+ /**
+ * @return the bulkProcessor
+ */
+ public BulkProcessor getBulkProcessor() {
+ return bulkProcessor;
+ }
+
+ /**
+ * @param bulkProcessor the bulkProcessor to set
+ */
+ public void setBulkProcessor(BulkProcessor bulkProcessor) {
+ this.bulkProcessor = bulkProcessor;
+ }
+
+ public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) {
+
+ UpdateRequest ur = null;
+ try {
+ ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject());
+ } catch (IOException e) {
+ LOG.error("Error whilst attempting to generate a new Update Request.", e);
+ }
+
+ return ur;
+ }
+
+ public UpdateRequest generateUpdateRequest(String index, String type, String id, Map<String, Object> filedValueMap) {
+
+ UpdateRequest ur = null;
+ try {
+ XContentBuilder builder = jsonBuilder().startObject();
+ for (Entry<String, Object> entry : filedValueMap.entrySet()) {
+ String key = entry.getKey();
+ builder.field(key, filedValueMap.get(key));
+ }
+ builder.endObject();
+ ur = new UpdateRequest(index, type, id).doc(builder);
+ } catch (IOException e) {
+ LOG.error("Error whilst attempting to generate a new Update Request.", e);
+ }
+
+ return ur;
+ }
+
+ public int getDocCount(String index, String... type) {
+ MatchAllQueryBuilder search = QueryBuilders.matchAllQuery();
+ String[] indexArr = new String[] { index };
+ return this.getDocCount(indexArr, type, search);
+ }
+
+ public int getDocCount(String[] index, String[] type) {
+ MatchAllQueryBuilder search = QueryBuilders.matchAllQuery();
+ return this.getDocCount(index, type, search);
+ }
+
+ public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch) {
+ SearchRequestBuilder countSrBuilder = getClient().prepareSearch(index).setTypes(type).setQuery(filterSearch).setSize(0);
+ SearchResponse countSr = countSrBuilder.execute().actionGet();
+ int docCount = (int) countSr.getHits().getTotalHits();
+ return docCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
new file mode 100644
index 0000000..e49c029
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.driver;
+
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.spark.sql.SQLContext;
+
+import java.io.File;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.util.Properties;
+//import org.apache.spark.sql.SparkSession;
+
+public class SparkDriver implements Serializable {
+
+ //TODO the commented out code below is the API uprgade
+ //for Spark 2.0.0. It requires a large upgrade and simplification
+ //across the mudrod codebase so should be done in an individual ticket.
+ // /**
+ // *
+ // */
+ // private static final long serialVersionUID = 1L;
+ // private SparkSession builder;
+ //
+ // public SparkDriver() {
+ // builder = SparkSession.builder()
+ // .master("local[2]")
+ // .config("spark.hadoop.validateOutputSpecs", "false")
+ // .config("spark.files.overwrite", "true")
+ // .getOrCreate();
+ // }
+ //
+ // public SparkSession getBuilder() {
+ // return builder;
+ // }
+ //
+ // public void setBuilder(SparkSession builder) {
+ // this.builder = builder;
+ // }
+ //
+ // public void close() {
+ // builder.stop();
+ // }
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ public transient JavaSparkContext sc;
+ public transient SQLContext sqlContext;
+
+ public SparkDriver() {
+ // empty default constructor
+ }
+
+ public SparkDriver(Properties props) {
+ SparkConf conf = new SparkConf().setAppName(props.getProperty(MudrodConstants.SPARK_APP_NAME, "MudrodSparkApp")).setIfMissing("spark.master", props.getProperty(MudrodConstants.SPARK_MASTER))
+ .set("spark.hadoop.validateOutputSpecs", "false").set("spark.files.overwrite", "true");
+
+ String esHost = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
+ String esPort = props.getProperty(MudrodConstants.ES_HTTP_PORT);
+
+ if (!"".equals(esHost)) {
+ conf.set("es.nodes", esHost);
+ }
+
+ if (!"".equals(esPort)) {
+ conf.set("es.port", esPort);
+ }
+
+ conf.set("spark.serializer", KryoSerializer.class.getName());
+ conf.set("es.batch.size.entries", "1500");
+
+ sc = new JavaSparkContext(conf);
+ sqlContext = new SQLContext(sc);
+ }
+
+ public void close() {
+ sc.sc().stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
new file mode 100644
index 0000000..d9cde36
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes commonly used Elasticsearch and Spark related
+ * functions
+ */
+package gov.nasa.jpl.mudrod.driver;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
new file mode 100644
index 0000000..14b667d
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.integration;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.DecimalFormat;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * Supports ability to integrate vocab similarity results from metadata, ontology, and web logs.
+ */
+public class LinkageIntegration extends DiscoveryStepAbstract {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LinkageIntegration.class);
+ private static final long serialVersionUID = 1L;
+ transient List<LinkedTerm> termList = new ArrayList<>();
+ DecimalFormat df = new DecimalFormat("#.00");
+ private static final String INDEX_NAME = "indexName";
+ private static final String WEIGHT = "weight";
+
+ public LinkageIntegration(Properties props, ESDriver es, SparkDriver spark) {
+ super(props, es, spark);
+ }
+
+ /**
+ * The data structure to store semantic triple.
+ */
+ class LinkedTerm {
+ String term = null;
+ double weight = 0;
+ String model = null;
+
+ public LinkedTerm(String str, double w, String m) {
+ term = str;
+ weight = w;
+ model = m;
+ }
+ }
+
+ /**
+ * Method of executing integration step
+ */
+ @Override
+ public Object execute() {
+ getIngeratedList("ocean wind", 11);
+ return null;
+ }
+
+ @Override
+ public Object execute(Object o) {
+ return null;
+ }
+
+ /**
+ * Method of getting integrated results
+ *
+ * @param input query string
+ * @return a hash map where the string is a related term, and double is the
+ * similarity to the input query
+ */
+ public Map<String, Double> appyMajorRule(String input) {
+ termList = new ArrayList<>();
+ Map<String, Double> termsMap = new HashMap<>();
+ Map<String, List<LinkedTerm>> map = new HashMap<>();
+ try {
+ map = aggregateRelatedTermsFromAllmodel(es.customAnalyzing(props.getProperty(INDEX_NAME), input));
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error applying majority rule", e);
+ }
+
+ for (Entry<String, List<LinkedTerm>> entry : map.entrySet()) {
+ List<LinkedTerm> list = entry.getValue();
+ double sumModelWeight = 0;
+ double tmp = 0;
+ for (LinkedTerm element : list) {
+ sumModelWeight += getModelweight(element.model);
+
+ if (element.weight > tmp) {
+ tmp = element.weight;
+ }
+ }
+
+ double finalWeight = tmp + ((sumModelWeight - 2) * 0.05);
+ if (finalWeight < 0) {
+ finalWeight = 0;
+ }
+
+ if (finalWeight > 1) {
+ finalWeight = 1;
+ }
+ termsMap.put(entry.getKey(), Double.parseDouble(df.format(finalWeight)));
+ }
+
+ return sortMapByValue(termsMap);
+ }
+
+ /**
+ * Method of getting integrated results
+ *
+ * @param input query string
+ * @param num the number of most related terms
+ * @return a string of related terms along with corresponding similarities
+ */
+ public String getIngeratedList(String input, int num) {
+ String output = "";
+ Map<String, Double> sortedMap = appyMajorRule(input);
+ int count = 0;
+ for (Entry<String, Double> entry : sortedMap.entrySet()) {
+ if (count < num) {
+ output += entry.getKey() + " = " + entry.getValue() + ", ";
+ }
+ count++;
+ }
+ LOG.info("\n************************Integrated results***************************");
+ LOG.info(output);
+ return output;
+ }
+
+ /**
+ * Method of getting integrated results
+ *
+ * @param input query string
+ * @return a JSON object of related terms along with corresponding similarities
+ */
+ public JsonObject getIngeratedListInJson(String input) {
+ Map<String, Double> sortedMap = appyMajorRule(input);
+ int count = 0;
+ Map<String, Double> trimmedMap = new LinkedHashMap<>();
+ for (Entry<String, Double> entry : sortedMap.entrySet()) {
+ if (!entry.getKey().contains("china")) {
+ if (count < 10) {
+ trimmedMap.put(entry.getKey(), entry.getValue());
+ }
+ count++;
+ }
+ }
+
+ return mapToJson(trimmedMap);
+ }
+
+ /**
+ * Method of aggregating terms from web logs, metadata, and ontology
+ *
+ * @param input query string
+ * @return a hash map where the string is a related term, and the list is
+ * the similarities from different sources
+ */
+ public Map<String, List<LinkedTerm>> aggregateRelatedTermsFromAllmodel(String input) {
+ aggregateRelatedTerms(input, props.getProperty("userHistoryLinkageType"));
+ aggregateRelatedTerms(input, props.getProperty("clickStreamLinkageType"));
+ aggregateRelatedTerms(input, props.getProperty("metadataLinkageType"));
+ aggregateRelatedTermsSWEET(input, props.getProperty("ontologyLinkageType"));
+
+ return termList.stream().collect(Collectors.groupingBy(w -> w.term));
+ }
+
+ public int getModelweight(String model) {
+ if (model.equals(props.getProperty("userHistoryLinkageType"))) {
+ return Integer.parseInt(props.getProperty("userHistory_w"));
+ }
+
+ if (model.equals(props.getProperty("clickStreamLinkageType"))) {
+ return Integer.parseInt(props.getProperty("clickStream_w"));
+ }
+
+ if (model.equals(props.getProperty("metadataLinkageType"))) {
+ return Integer.parseInt(props.getProperty("metadata_w"));
+ }
+
+ if (model.equals(props.getProperty("ontologyLinkageType"))) {
+ return Integer.parseInt(props.getProperty("ontology_w"));
+ }
+
+ return 999999;
+ }
+
+ /**
+ * Method of extracting the related term from a comma string
+ *
+ * @param str input string
+ * @param input query string
+ * @return related term contained in the input string
+ */
+ public String extractRelated(String str, String input) {
+ String[] strList = str.split(",");
+ if (input.equals(strList[0])) {
+ return strList[1];
+ } else {
+ return strList[0];
+ }
+ }
+
+ public void aggregateRelatedTerms(String input, String model) {
+ //get the first 10 related terms
+ SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("keywords", input)).addSort(WEIGHT, SortOrder.DESC).setSize(11)
+ .execute().actionGet();
+
+ LOG.info("\n************************ {} results***************************", model);
+ for (SearchHit hit : usrhis.getHits().getHits()) {
+ Map<String, Object> result = hit.getSource();
+ String keywords = (String) result.get("keywords");
+ String relatedKey = extractRelated(keywords, input);
+
+ if (!relatedKey.equals(input)) {
+ LinkedTerm lTerm = new LinkedTerm(relatedKey, (double) result.get(WEIGHT), model);
+ LOG.info("( {} {} )", relatedKey, (double) result.get(WEIGHT));
+ termList.add(lTerm);
+ }
+
+ }
+ }
+
+ /**
+ * Method of querying related terms from ontology
+ *
+ * @param input input query
+ * @param model source name
+ */
+ public void aggregateRelatedTermsSWEET(String input, String model) {
+ SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC)
+ .setSize(11).execute().actionGet();
+ LOG.info("\n************************ {} results***************************", model);
+ for (SearchHit hit : usrhis.getHits().getHits()) {
+ Map<String, Object> result = hit.getSource();
+ String conceptB = (String) result.get("concept_B");
+ if (!conceptB.equals(input)) {
+ LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), model);
+ LOG.info("( {} {} )", conceptB, (double) result.get(WEIGHT));
+ termList.add(lTerm);
+ }
+ }
+ }
+
+ /**
+ * Method of sorting a map by value
+ *
+ * @param passedMap input map
+ * @return sorted map
+ */
+ public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) {
+ List<String> mapKeys = new ArrayList<>(passedMap.keySet());
+ List<Double> mapValues = new ArrayList<>(passedMap.values());
+ Collections.sort(mapValues, Collections.reverseOrder());
+ Collections.sort(mapKeys, Collections.reverseOrder());
+
+ LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>();
+
+ Iterator<Double> valueIt = mapValues.iterator();
+ while (valueIt.hasNext()) {
+ Object val = valueIt.next();
+ Iterator<String> keyIt = mapKeys.iterator();
+
+ while (keyIt.hasNext()) {
+ Object key = keyIt.next();
+ String comp1 = passedMap.get(key).toString();
+ String comp2 = val.toString();
+
+ if (comp1.equals(comp2)) {
+ passedMap.remove(key);
+ mapKeys.remove(key);
+ sortedMap.put((String) key, (Double) val);
+ break;
+ }
+
+ }
+
+ }
+ return sortedMap;
+ }
+
+ /**
+ * Method of converting hashmap to JSON
+ *
+ * @param word input query
+ * @param wordweights a map from related terms to weights
+ * @return converted JSON object
+ */
+ private JsonObject mapToJson(Map<String, Double> wordweights) {
+ Gson gson = new Gson();
+ JsonObject json = new JsonObject();
+ List<JsonObject> nodes = new ArrayList<>();
+
+ for (Entry<String, Double> entry : wordweights.entrySet()) {
+ JsonObject node = new JsonObject();
+ String key = entry.getKey();
+ Double value = entry.getValue();
+ node.addProperty("word", key);
+ node.addProperty("weight", value);
+ nodes.add(node);
+ }
+
+ JsonElement nodesElement = gson.toJsonTree(nodes);
+ json.add("ontology", nodesElement);
+
+ return json;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
new file mode 100644
index 0000000..7f2ba66
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes integration method of web log, ontology, and metdata
+ * mining results.
+ */
+package gov.nasa.jpl.mudrod.integration;
\ No newline at end of file