You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:35:01 UTC

[20/21] incubator-sdap-mudrod git commit: SDAP-1 Import all code under the SDAP SGA

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
new file mode 100644
index 0000000..c63d005
--- /dev/null
+++ b/core/pom.xml
@@ -0,0 +1,324 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed under the Apache License, Version 2.0 (the "License"); 
+  you may not use this file except in compliance with the License. 
+  You may obtain  a copy of the License at 
+  
+  http://www.apache.org/licenses/LICENSE-2.0 Unless 
+  
+  required by applicable law or agreed to in writing, software 
+  distributed under the License is distributed on an "AS IS" 
+  BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
+  express or implied. See the License for the specific language 
+  governing permissions and limitations under the License. 
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>gov.nasa.jpl.mudrod</groupId>
+        <artifactId>mudrod-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+        <relativePath>../</relativePath>
+    </parent>
+
+    <artifactId>mudrod-core</artifactId>
+
+    <name>Mudrod :: Core</name>
+    <description>Core Mudrod library implementation.</description>
+
+    <properties>
+        <!-- This property is the name of the directory containing the SVM/SGD Model. It is used during build to create a zip file from this directory -->
+        <svmSgdModel.value>javaSVMWithSGDModel</svmSgdModel.value>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jdom</groupId>
+            <artifactId>jdom</artifactId>
+        </dependency>
+
+        <!-- Elasticsearch dependencies -->
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>transport</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.elasticsearch.plugin</groupId>
+                    <artifactId>transport-netty4-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch-spark-20_2.11</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.carrotsearch</groupId>
+            <artifactId>hppc</artifactId>
+        </dependency>
+        <!-- End of Elasticsearch dependencies -->
+
+        <!-- Spark dependencies -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.11</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_2.11</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fommil.netlib</groupId>
+            <artifactId>all</artifactId>
+            <type>pom</type>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+        </dependency>
+
+        <dependency>
+            <!-- Needed to run the Spark UI during log ingestion -->
+            <groupId>javax.servlet</groupId>
+            <artifactId>javax.servlet-api</artifactId>
+            <version>3.1.0</version>
+            <scope>compile</scope>
+        </dependency>
+        <!-- End of Spark dependencies -->
+
+        <dependency>
+            <groupId>org.codehaus.jettison</groupId>
+            <artifactId>jettison</artifactId>
+            <version>1.3.8</version>
+        </dependency>
+
+        <!-- Logging dependencies -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+        </dependency>
+        <!-- End of Logging dependencies -->
+
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.opencsv</groupId>
+            <artifactId>opencsv</artifactId>
+            <version>2.3</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.jena</groupId>
+            <artifactId>jena-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/main/resources</directory>
+                <filtering>true</filtering>
+                <excludes>
+                    <exclude>${svmSgdModel.value}/**</exclude>
+                </excludes>
+            </resource>
+
+            <resource>
+                <directory>${project.build.directory}</directory>
+                <includes>
+                    <include>${svmSgdModel.value}.zip</include>
+                </includes>
+            </resource>
+
+            <resource>
+                <directory>${basedir}/../</directory>
+                <targetPath>META-INF</targetPath>
+                <includes>
+                    <include>LICENSE.txt</include>
+                    <include>NOTICE.txt</include>
+                </includes>
+            </resource>
+        </resources>
+
+        <plugins>
+            <!-- generates the bin launchers -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>appassembler-maven-plugin</artifactId>
+                <version>1.10</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>assemble</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <repositoryLayout>flat</repositoryLayout>
+                    <repositoryName>lib</repositoryName>
+                    <programs>
+                        <program>
+                            <mainClass>gov.nasa.jpl.mudrod.main.MudrodEngine
+                            </mainClass>
+                            <name>mudrod-engine</name>
+                        </program>
+                    </programs>
+                </configuration>
+            </plugin>
+
+            <!-- Generates the distribution package -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.6</version>
+                <executions>
+                    <execution>
+                        <id>zipSVMWithSGDModel</id>
+                        <phase>generate-resources</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <appendAssemblyId>false</appendAssemblyId>
+                            <tarLongFileMode>posix</tarLongFileMode>
+                            <finalName>${svmSgdModel.value}</finalName>
+                            <outputDirectory>${project.build.directory}
+                            </outputDirectory>
+                            <descriptors>
+                                <descriptor>
+                                    ${basedir}/src/main/assembly/zipSVMWithSGDModel.xml
+                                </descriptor>
+                            </descriptors>
+                        </configuration>
+                    </execution>
+
+                    <execution>
+                        <id>generateDistribution</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <appendAssemblyId>false</appendAssemblyId>
+                            <tarLongFileMode>posix</tarLongFileMode>
+                            <descriptors>
+                                <descriptor>
+                                    ${basedir}/src/main/assembly/bin.xml
+                                </descriptor>
+                            </descriptors>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.0.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <manifestEntries>
+                                        <Main-Class>
+                                            gov.nasa.jpl.mudrod.main.MudrodEngine
+                                        </Main-Class>
+                                        <Build-Number>${implementation.build}
+                                        </Build-Number>
+                                    </manifestEntries>
+                                </transformer>
+                            </transformers>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <finalName>
+                                ${project.artifactId}-uber-${project.version}
+                            </finalName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>release</id>
+            <build>
+                <resources>
+                    <resource>
+                        <directory>${basedir}/../</directory>
+                        <targetPath>
+                            ${project.build.directory}/apidocs/META-INF
+                        </targetPath>
+                        <includes>
+                            <include>LICENSE.txt</include>
+                            <include>NOTICE.txt</include>
+                        </includes>
+                    </resource>
+                </resources>
+            </build>
+        </profile>
+    </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/core/src/main/assembly/bin.xml b/core/src/main/assembly/bin.xml
new file mode 100644
index 0000000..7977d65
--- /dev/null
+++ b/core/src/main/assembly/bin.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain  a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0 Unless
+
+  required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS"
+  BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+  express or implied. See the License for the specific language
+  governing permissions and limitations under the License.
+-->
+<assembly
+        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <id>bin</id>
+    <formats>
+        <format>tar.gz</format>
+        <format>zip</format>
+    </formats>
+    <fileSets>
+        <fileSet>
+            <directory>target/appassembler/bin</directory>
+            <outputDirectory>bin</outputDirectory>
+            <excludes>
+                <exclude>*.bat</exclude>
+            </excludes>
+            <lineEnding>unix</lineEnding>
+            <directoryMode>0755</directoryMode>
+            <fileMode>0644</fileMode>
+        </fileSet>
+        <fileSet>
+            <directory>target/appassembler/bin</directory>
+            <outputDirectory>bin</outputDirectory>
+            <includes>
+                <include>*.bat</include>
+            </includes>
+            <lineEnding>dos</lineEnding>
+            <directoryMode>0755</directoryMode>
+            <fileMode>0644</fileMode>
+        </fileSet>
+        <fileSet>
+            <directory>target/appassembler/lib</directory>
+            <outputDirectory>lib</outputDirectory>
+        </fileSet>
+    </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/assembly/zipSVMWithSGDModel.xml
----------------------------------------------------------------------
diff --git a/core/src/main/assembly/zipSVMWithSGDModel.xml b/core/src/main/assembly/zipSVMWithSGDModel.xml
new file mode 100644
index 0000000..aec117f
--- /dev/null
+++ b/core/src/main/assembly/zipSVMWithSGDModel.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain  a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0 Unless
+
+  required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS"
+  BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+  express or implied. See the License for the specific language
+  governing permissions and limitations under the License.
+-->
+<assembly
+        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <id>zipSVMWithSGDModel</id>
+    <baseDirectory>${svmSgdModel.value}</baseDirectory>
+    <formats>
+        <format>zip</format>
+    </formats>
+    <fileSets>
+        <fileSet>
+            <directory>${basedir}/src/main/resources/${svmSgdModel.value}</directory>
+            <outputDirectory>.</outputDirectory>
+        </fileSet>
+    </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryEngineAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryEngineAbstract.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryEngineAbstract.java
new file mode 100644
index 0000000..2de9cf6
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryEngineAbstract.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+public abstract class DiscoveryEngineAbstract extends MudrodAbstract implements Serializable {
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+
+  public DiscoveryEngineAbstract(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  /**
+   * Abstract method of preprocess
+   */
+  public abstract void preprocess();
+
+  /**
+   * Abstract method of process
+   */
+  public abstract void process();
+
+  /**
+   * Abstract method of output
+   */
+  public abstract void output();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryStepAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryStepAbstract.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryStepAbstract.java
new file mode 100644
index 0000000..94e8094
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/DiscoveryStepAbstract.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+
+import java.util.Properties;
+
+/*
+ * Generic class of discovery engine step
+ */
+public abstract class DiscoveryStepAbstract extends MudrodAbstract {
+
+  /**
+   * 
+   */
+  private static final long serialVersionUID = 1L;
+
+  public DiscoveryStepAbstract(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  /**
+   * Abstract class of step execution without parameter
+   *
+   * @return An instance of Object
+   */
+  public abstract Object execute();
+
+  /**
+   * Abstract class of step execution with parameter
+   *
+   * @param o an instance of object
+   * @return An instance of object
+   */
+  public abstract Object execute(Object o);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MetadataDiscoveryEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MetadataDiscoveryEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MetadataDiscoveryEngine.java
new file mode 100644
index 0000000..e994825
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MetadataDiscoveryEngine.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.metadata.pre.ApiHarvester;
+import gov.nasa.jpl.mudrod.metadata.pre.MatrixGenerator;
+import gov.nasa.jpl.mudrod.metadata.process.MetadataAnalyzer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Supports to preprocess and process metadata
+ */
+public class MetadataDiscoveryEngine extends DiscoveryEngineAbstract implements Serializable {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(MetadataDiscoveryEngine.class);
+
+  public MetadataDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  /**
+   * Method of preprocessing metadata
+   */
+  public void preprocess() {
+    LOG.info("Starting metadata preprocessing...");
+    startTime = System.currentTimeMillis();
+
+    DiscoveryStepAbstract harvester = new ApiHarvester(this.props, this.es, this.spark);
+    harvester.execute();
+
+    endTime = System.currentTimeMillis();
+    LOG.info("Finished metadata preprocessing. Time elapsed: {}s", (endTime - startTime) / 1000);
+  }
+
+  /**
+   * Method of processing metadata
+   */
+  public void process() {
+    LOG.info("Starting metadata processing...");
+    startTime = System.currentTimeMillis();
+
+    DiscoveryStepAbstract matrix = new MatrixGenerator(this.props, this.es, this.spark);
+    matrix.execute();
+
+    DiscoveryStepAbstract svd = new MetadataAnalyzer(this.props, this.es, this.spark);
+    svd.execute();
+
+    endTime = System.currentTimeMillis();
+    LOG.info("Finished metadata processing. Time elapsed: {}s", (endTime - startTime) / 1000);
+  }
+
+  public void output() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MudrodAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MudrodAbstract.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MudrodAbstract.java
new file mode 100644
index 0000000..b3f1831
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/MudrodAbstract.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import org.apache.commons.io.IOUtils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckForNull;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * This is the most generic class of Mudrod
+ */
+public abstract class MudrodAbstract implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MudrodAbstract.class);
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  protected Properties props = new Properties();
+  protected ESDriver es = null;
+  protected SparkDriver spark = null;
+  protected long startTime;
+  protected long endTime;
+
+  protected static final String ES_SETTINGS = "elastic_settings.json";
+  protected static final String ES_MAPPINGS = "elastic_mappings.json";
+
+  public MudrodAbstract(Properties props, ESDriver es, SparkDriver spark) {
+    this.props = props;
+    this.es = es;
+    this.spark = spark;
+
+    if (this.props != null) {
+      this.initMudrod();
+    }
+  }
+
+  /**
+   * Method of setting up essential configuration for MUDROD to start
+   */
+  @CheckForNull
+  protected void initMudrod() {
+    InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS);
+    InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS);
+    JSONObject settingsJSON = null;
+    JSONObject mappingJSON = null;
+
+    try {
+      settingsJSON = new JSONObject(IOUtils.toString(settingsStream));
+    } catch (JSONException | IOException e1) {
+      LOG.error("Error reading Elasticsearch settings!", e1);
+    }
+
+    try {
+      mappingJSON = new JSONObject(IOUtils.toString(mappingsStream));
+    } catch (JSONException | IOException e1) {
+      LOG.error("Error reading Elasticsearch mappings!", e1);
+    }
+
+    try {
+      if (settingsJSON != null && mappingJSON != null) {
+        this.es.putMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME), settingsJSON.toString(), mappingJSON.toString());
+      }
+    } catch (IOException e) {
+      LOG.error("Error entering Elasticsearch Mappings!", e);
+    }
+  }
+
+  /**
+   * Get driver of Elasticsearch
+   *
+   * @return driver of Elasticsearch
+   */
+  public ESDriver getES() {
+    return this.es;
+  }
+
+  /**
+   * Get configuration of MUDROD (read from configuration file)
+   *
+   * @return configuration of MUDROD
+   */
+  public Properties getConfig() {
+    return this.props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/OntologyDiscoveryEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/OntologyDiscoveryEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/OntologyDiscoveryEngine.java
new file mode 100644
index 0000000..fe8066f
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/OntologyDiscoveryEngine.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.ontology.pre.AggregateTriples;
+import gov.nasa.jpl.mudrod.ontology.process.OntologyLinkCal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Supports to preprocess and process ontology
+ */
+public class OntologyDiscoveryEngine extends DiscoveryEngineAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(OntologyDiscoveryEngine.class);
+
+  public OntologyDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  /**
+   * Method of preprocessing ontology
+   */
+  public void preprocess() {
+    LOG.info("*****************Ontology preprocessing starts******************");
+    startTime = System.currentTimeMillis();
+
+    DiscoveryStepAbstract at = new AggregateTriples(this.props, this.es, this.spark);
+    at.execute();
+
+    endTime = System.currentTimeMillis();
+    LOG.info("*****************Ontology preprocessing ends******************Took {}s", (endTime - startTime) / 1000);
+  }
+
+  /**
+   * Method of processing ontology
+   */
+  public void process() {
+    LOG.info("*****************Ontology processing starts******************");
+    startTime = System.currentTimeMillis();
+
+    DiscoveryStepAbstract ol = new OntologyLinkCal(this.props, this.es, this.spark);
+    ol.execute();
+
+    endTime = System.currentTimeMillis();
+    LOG.info("*****************Ontology processing ends******************Took {}s", (endTime - startTime) / 1000);
+  }
+
+  public void output() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/RecommendEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/RecommendEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/RecommendEngine.java
new file mode 100644
index 0000000..9805b79
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/RecommendEngine.java
@@ -0,0 +1,77 @@
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.recommendation.pre.ImportMetadata;
+import gov.nasa.jpl.mudrod.recommendation.pre.MetadataTFIDFGenerator;
+import gov.nasa.jpl.mudrod.recommendation.pre.NormalizeVariables;
+import gov.nasa.jpl.mudrod.recommendation.pre.SessionCooccurence;
+import gov.nasa.jpl.mudrod.recommendation.process.AbstractBasedSimilarity;
+import gov.nasa.jpl.mudrod.recommendation.process.VariableBasedSimilarity;
+import gov.nasa.jpl.mudrod.recommendation.process.sessionBasedCF;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class RecommendEngine extends DiscoveryEngineAbstract {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(RecommendEngine.class);
+
+  public RecommendEngine(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+    LOG.info("Started Mudrod Recommend Engine.");
+  }
+
+  @Override
+  public void preprocess() {
+    LOG.info("*****************Recommendation preprocessing starts******************");
+
+    startTime = System.currentTimeMillis();
+
+    DiscoveryStepAbstract harvester = new ImportMetadata(this.props, this.es, this.spark);
+    harvester.execute();
+
+    DiscoveryStepAbstract tfidf = new MetadataTFIDFGenerator(this.props, this.es, this.spark);
+    tfidf.execute();
+
+    DiscoveryStepAbstract sessionMatrixGen = new SessionCooccurence(this.props, this.es, this.spark);
+    sessionMatrixGen.execute();
+
+    DiscoveryStepAbstract transformer = new NormalizeVariables(this.props, this.es, this.spark);
+    transformer.execute();
+
+    endTime = System.currentTimeMillis();
+
+    LOG.info("*****************Recommendation preprocessing  ends******************Took {}s {}", (endTime - startTime) / 1000);
+  }
+
+  @Override
+  public void process() {
+    // TODO Auto-generated method stub
+    LOG.info("*****************Recommendation processing starts******************");
+
+    startTime = System.currentTimeMillis();
+
+    DiscoveryStepAbstract tfCF = new AbstractBasedSimilarity(this.props, this.es, this.spark);
+    tfCF.execute();
+
+    DiscoveryStepAbstract cbCF = new VariableBasedSimilarity(this.props, this.es, this.spark);
+    cbCF.execute();
+
+    DiscoveryStepAbstract sbCF = new sessionBasedCF(this.props, this.es, this.spark);
+    sbCF.execute();
+
+    endTime = System.currentTimeMillis();
+
+    LOG.info("*****************Recommendation processing ends******************Took {}s {}", (endTime - startTime) / 1000);
+  }
+
+  @Override
+  public void output() {
+    // TODO Auto-generated method stub
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/WeblogDiscoveryEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/WeblogDiscoveryEngine.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/WeblogDiscoveryEngine.java
new file mode 100644
index 0000000..4657222
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/WeblogDiscoveryEngine.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
+
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import gov.nasa.jpl.mudrod.weblog.pre.*;
+import gov.nasa.jpl.mudrod.weblog.process.ClickStreamAnalyzer;
+import gov.nasa.jpl.mudrod.weblog.process.UserHistoryAnalyzer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Supports to preprocess and process web log
+ */
+public class WeblogDiscoveryEngine extends DiscoveryEngineAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(WeblogDiscoveryEngine.class);
+  public String timeSuffix = null;
+
+  public WeblogDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+    LOG.info("Started Mudrod Weblog Discovery Engine.");
+  }
+
+  /**
+   * Get log file list from a directory
+   *
+   * @param logDir path to directory containing logs either local or in HDFS.
+   * @return a list of log files
+   */
+  public List<String> getFileList(String logDir) {
+
+    ArrayList<String> inputList = new ArrayList<>();
+    if (!logDir.startsWith("hdfs://")) {
+      File directory = new File(logDir);
+      File[] fList = directory.listFiles();
+      for (File file : fList) {
+        if (file.isFile() && file.getName().matches(".*\\d+.*") && file.getName().contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) {
+          inputList.add(file.getName().replace(props.getProperty(MudrodConstants.HTTP_PREFIX), ""));
+        }
+      }
+    } else {
+      Configuration conf = new Configuration();
+      try (FileSystem fs = FileSystem.get(new URI(logDir), conf)) {
+        FileStatus[] fileStatus;
+        fileStatus = fs.listStatus(new Path(logDir));
+        for (FileStatus status : fileStatus) {
+          String path1 = status.getPath().toString();
+          if (path1.matches(".*\\d+.*") && path1.contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) {
+
+            String time = path1.substring(path1.lastIndexOf('.') + 1);
+            inputList.add(time);
+          }
+        }
+      } catch (IllegalArgumentException | IOException | URISyntaxException e) {
+        LOG.error("An error occured whilst obtaining the log file list.", e);
+      }
+    }
+
+    return inputList;
+  }
+
+  /**
+   * Method of preprocessing web logs, generating vocab similarity based on web
+   * logs
+   */
+  @Override
+  public void preprocess() {
+    LOG.info("Starting Web log preprocessing.");
+
+    ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR));
+
+    for (int i = 0; i < inputList.size(); i++) {
+      timeSuffix = inputList.get(i);
+      props.put(MudrodConstants.TIME_SUFFIX, timeSuffix);
+      startTime = System.currentTimeMillis();
+      LOG.info("Processing logs dated {}", inputList.get(i));
+
+      DiscoveryStepAbstract im = new ImportLogFile(this.props, this.es, this.spark);
+      im.execute();
+
+      DiscoveryStepAbstract cd = new CrawlerDetection(this.props, this.es, this.spark);
+      cd.execute();
+
+      DiscoveryStepAbstract sg = new SessionGenerator(this.props, this.es, this.spark);
+      sg.execute();
+
+      DiscoveryStepAbstract ss = new SessionStatistic(this.props, this.es, this.spark);
+      ss.execute();
+
+      DiscoveryStepAbstract rr = new RemoveRawLog(this.props, this.es, this.spark);
+      rr.execute();
+
+      endTime = System.currentTimeMillis();
+
+      LOG.info("Web log preprocessing for logs dated {} complete. Time elapsed {} seconds.", inputList.get(i), (endTime - startTime) / 1000);
+    }
+
+    DiscoveryStepAbstract hg = new HistoryGenerator(this.props, this.es, this.spark);
+    hg.execute();
+
+    DiscoveryStepAbstract cg = new ClickStreamGenerator(this.props, this.es, this.spark);
+    cg.execute();
+
+    LOG.info("Web log preprocessing (user history and clickstream) complete.");
+  }
+
+  /**
+   * Method of web log ingest
+   */
+  public void logIngest() {
+    LOG.info("Starting Web log ingest.");
+    ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR));
+    for (int i = 0; i < inputList.size(); i++) {
+      timeSuffix = inputList.get(i);
+      props.put("TimeSuffix", timeSuffix);
+      DiscoveryStepAbstract im = new ImportLogFile(this.props, this.es, this.spark);
+      im.execute();
+    }
+
+    LOG.info("Web log ingest complete.");
+
+  }
+
+  /**
+   * Method of reconstructing user sessions from raw web logs
+   */
+  public void sessionRestruct() {
+    LOG.info("Starting Session reconstruction.");
+    ArrayList<String> inputList = (ArrayList<String>) getFileList(props.getProperty(MudrodConstants.DATA_DIR));
+    for (int i = 0; i < inputList.size(); i++) {
+      timeSuffix = inputList.get(i); // change timeSuffix dynamically
+      props.put(MudrodConstants.TIME_SUFFIX, timeSuffix);
+      DiscoveryStepAbstract cd = new CrawlerDetection(this.props, this.es, this.spark);
+      cd.execute();
+
+      DiscoveryStepAbstract sg = new SessionGenerator(this.props, this.es, this.spark);
+      sg.execute();
+
+      DiscoveryStepAbstract ss = new SessionStatistic(this.props, this.es, this.spark);
+      ss.execute();
+
+      DiscoveryStepAbstract rr = new RemoveRawLog(this.props, this.es, this.spark);
+      rr.execute();
+
+      endTime = System.currentTimeMillis();
+    }
+    LOG.info("Session reconstruction complete.");
+  }
+
+  @Override
+  public void process() {
+    LOG.info("Starting Web log processing.");
+    startTime = System.currentTimeMillis();
+
+    DiscoveryStepAbstract svd = new ClickStreamAnalyzer(this.props, this.es, this.spark);
+    svd.execute();
+
+    DiscoveryStepAbstract ua = new UserHistoryAnalyzer(this.props, this.es, this.spark);
+    ua.execute();
+
+    endTime = System.currentTimeMillis();
+    LOG.info("Web log processing complete. Time elaspsed {} seconds.", (endTime - startTime) / 1000);
+  }
+
+  @Override
+  public void output() {
+    // not implemented yet!
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/package-info.java
new file mode 100644
index 0000000..21e528e
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/discoveryengine/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes abstract classes of MUDROD, discovery step, and engine.
+ * Workflow classes such as weblogDiscoveryEngine, OntologyDiscoveryEngine, and
+ * MetadataDiscoveryEngine are also included here.
+ */
+package gov.nasa.jpl.mudrod.discoveryengine;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
new file mode 100644
index 0000000..d1af04a
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/driver/ESDriver.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.driver;
+
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import com.google.gson.GsonBuilder;
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import gov.nasa.jpl.mudrod.main.MudrodEngine;
+import gov.nasa.jpl.mudrod.utils.ESTransportClient;
+import org.apache.commons.lang.StringUtils;
+import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
+import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.Fuzziness;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.search.suggest.SuggestBuilder;
+import org.elasticsearch.search.suggest.SuggestBuilders;
+import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * Driver implementation for all Elasticsearch functionality.
+ */
+public class ESDriver implements Serializable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ESDriver.class);
+  private static final long serialVersionUID = 1L;
+  private transient Client client = null;
+  private transient Node node = null;
+  private transient BulkProcessor bulkProcessor = null;
+
+  /**
+   * Default constructor for this class. To load client configuration call
+   * substantiated constructor.
+   */
+  public ESDriver() {
+    // Default constructor, to load configuration call ESDriver(props)
+  }
+
+  /**
+   * Substantiated constructor which accepts a {@link java.util.Properties}
+   *
+   * @param props a populated properties object.
+   */
+  public ESDriver(Properties props) {
+    try {
+      setClient(makeClient(props));
+    } catch (IOException e) {
+      LOG.error("Error whilst constructing Elastcisearch client.", e);
+    }
+  }
+
+  public void createBulkProcessor() {
+    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", 1000, 2500500);
+    setBulkProcessor(BulkProcessor.builder(getClient(), new BulkProcessor.Listener() {
+      @Override
+      public void beforeBulk(long executionId, BulkRequest request) {
+        LOG.debug("ESDriver#createBulkProcessor @Override #beforeBulk is not implemented yet!");
+      }
+
+      @Override
+      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+        LOG.debug("ESDriver#createBulkProcessor @Override #afterBulk is not implemented yet!");
+      }
+
+      @Override
+      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+        LOG.error("Bulk request has failed!");
+        throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure);
+      }
+    }).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1)
+        .build());
+  }
+
+  public void destroyBulkProcessor() {
+    try {
+      getBulkProcessor().awaitClose(20, TimeUnit.MINUTES);
+      setBulkProcessor(null);
+      refreshIndex();
+    } catch (InterruptedException e) {
+      LOG.error("Error destroying the Bulk Processor.", e);
+    }
+  }
+
+  public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException {
+
+    boolean exists = getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists();
+    if (exists) {
+      return;
+    }
+
+    getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet();
+    getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet();
+  }
+
+  public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException {
+    return this.customAnalyzing(indexName, "cody", str);
+  }
+
+  public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException {
+    String[] strList = str.toLowerCase().split(",");
+    for (int i = 0; i < strList.length; i++) {
+      String tmp = "";
+      AnalyzeResponse r = client.admin().indices().prepareAnalyze(strList[i]).setIndex(indexName).setAnalyzer(analyzer).execute().get();
+      for (AnalyzeToken token : r.getTokens()) {
+        tmp += token.getTerm() + " ";
+      }
+      strList[i] = tmp.trim();
+    }
+    return String.join(",", strList);
+  }
+
+  public List<String> customAnalyzing(String indexName, List<String> list) throws InterruptedException, ExecutionException {
+    if (list == null) {
+      return list;
+    }
+    int size = list.size();
+    List<String> customlist = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      customlist.add(this.customAnalyzing(indexName, list.get(i)));
+    }
+
+    return customlist;
+  }
+
+  public void deleteAllByQuery(String index, String type, QueryBuilder query) {
+    createBulkProcessor();
+    SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType(SearchType.QUERY_AND_FETCH).setTypes(type).setScroll(new TimeValue(60000)).setQuery(query).setSize(10000).execute()
+        .actionGet();
+
+    while (true) {
+      for (SearchHit hit : scrollResp.getHits().getHits()) {
+        DeleteRequest deleteRequest = new DeleteRequest(index, type, hit.getId());
+        getBulkProcessor().add(deleteRequest);
+      }
+
+      scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+      if (scrollResp.getHits().getHits().length == 0) {
+        break;
+      }
+
+    }
+    destroyBulkProcessor();
+  }
+
+  public void deleteType(String index, String type) {
+    this.deleteAllByQuery(index, type, QueryBuilders.matchAllQuery());
+  }
+
+  public List<String> getTypeListWithPrefix(Object object, Object object2) {
+    ArrayList<String> typeList = new ArrayList<>();
+    GetMappingsResponse res;
+    try {
+      res = getClient().admin().indices().getMappings(new GetMappingsRequest().indices(object.toString())).get();
+      ImmutableOpenMap<String, MappingMetaData> mapping = res.mappings().get(object.toString());
+      for (ObjectObjectCursor<String, MappingMetaData> c : mapping) {
+        if (c.key.startsWith(object2.toString())) {
+          typeList.add(c.key);
+        }
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("Error whilst obtaining type list from Elasticsearch mappings.", e);
+    }
+    return typeList;
+  }
+
+  public List<String> getIndexListWithPrefix(Object object) {
+
+    LOG.info("Retrieving index list with prefix: {}", object.toString());
+    String[] indices = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices();
+
+    ArrayList<String> indexList = new ArrayList<>();
+    int length = indices.length;
+    for (int i = 0; i < length; i++) {
+      String indexName = indices[i];
+      if (indexName.startsWith(object.toString())) {
+        indexList.add(indexName);
+      }
+    }
+
+    return indexList;
+  }
+
+  public String searchByQuery(String index, String type, String query) throws IOException, InterruptedException, ExecutionException {
+    return searchByQuery(index, type, query, false);
+  }
+
+  @SuppressWarnings("unchecked")
+  public String searchByQuery(String index, String type, String query, Boolean bDetail) throws IOException, InterruptedException, ExecutionException {
+    boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
+    if (!exists) {
+      return null;
+    }
+
+    QueryBuilder qb = QueryBuilders.queryStringQuery(query);
+    SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet();
+
+    // Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same.
+    Map<String, String> fieldsToReturn = new HashMap<>();
+
+    fieldsToReturn.put("Dataset-ShortName", "Short Name");
+    fieldsToReturn.put("Dataset-LongName", "Long Name");
+    fieldsToReturn.put("DatasetParameter-Topic", "Topic");
+    fieldsToReturn.put("Dataset-Description", "Dataset-Description");
+    fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date");
+
+    if (bDetail) {
+      fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat");
+      fieldsToReturn.put("Dataset-Doi", "Dataset-Doi");
+      fieldsToReturn.put("Dataset-ProcessingLevel", "Processing Level");
+      fieldsToReturn.put("DatasetCitation-Version", "Version");
+      fieldsToReturn.put("DatasetSource-Sensor-ShortName", "DatasetSource-Sensor-ShortName");
+      fieldsToReturn.put("DatasetProject-Project-ShortName", "DatasetProject-Project-ShortName");
+      fieldsToReturn.put("DatasetParameter-Category", "DatasetParameter-Category");
+      fieldsToReturn.put("DatasetLocationPolicy-BasePath", "DatasetLocationPolicy-BasePath");
+      fieldsToReturn.put("DatasetParameter-Variable-Full", "DatasetParameter-Variable-Full");
+      fieldsToReturn.put("DatasetParameter-Term-Full", "DatasetParameter-Term-Full");
+      fieldsToReturn.put("DatasetParameter-VariableDetail", "DatasetParameter-VariableDetail");
+
+      fieldsToReturn.put("DatasetRegion-Region", "Region");
+      fieldsToReturn.put("DatasetCoverage-NorthLat", "NorthLat");
+      fieldsToReturn.put("DatasetCoverage-SouthLat", "SouthLat");
+      fieldsToReturn.put("DatasetCoverage-WestLon", "WestLon");
+      fieldsToReturn.put("DatasetCoverage-EastLon", "EastLon");
+      fieldsToReturn.put("DatasetCoverage-StartTimeLong-Long", "DatasetCoverage-StartTimeLong-Long");
+      fieldsToReturn.put("Dataset-DatasetCoverage-StopTimeLong", "Dataset-DatasetCoverage-StopTimeLong");
+
+      fieldsToReturn.put("Dataset-TemporalResolution", "Dataset-TemporalResolution");
+      fieldsToReturn.put("Dataset-TemporalRepeat", "Dataset-TemporalRepeat");
+      fieldsToReturn.put("Dataset-LatitudeResolution", "Dataset-LatitudeResolution");
+      fieldsToReturn.put("Dataset-LongitudeResolution", "Dataset-LongitudeResolution");
+      fieldsToReturn.put("Dataset-AcrossTrackResolution", "Dataset-AcrossTrackResolution");
+      fieldsToReturn.put("Dataset-AlongTrackResolution", "Dataset-AlongTrackResolution");
+    }
+
+    List<Map<String, Object>> searchResults = new ArrayList<>();
+
+    for (SearchHit hit : response.getHits().getHits()) {
+      Map<String, Object> source = hit.getSource();
+
+      Map<String, Object> searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey()))
+          .collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue));
+
+      // searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result
+
+      // Some results require special handling/formatting:
+      // Release Date formatting
+      LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList<String>) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate();
+      searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE));
+
+      if (bDetail) {
+
+        // DataFormat value, translate RAW to BINARY
+        if ("RAW".equals(searchResult.get("DataFormat"))) {
+          searchResult.put("DataFormat", "BINARY");
+        }
+
+        // DatasetLocationPolicy-BasePath Should only contain ftp, http, or https URLs
+        List<String> urls = ((List<String>) searchResult.get("DatasetLocationPolicy-BasePath")).stream().filter(url -> url.startsWith("ftp") || url.startsWith("http")).collect(Collectors.toList());
+        searchResult.put("DatasetLocationPolicy-BasePath", urls);
+
+        // Time Span Formatting
+        LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate();
+        LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ?
+            null :
+            Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate();
+        searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE)));
+
+        // Temporal resolution can come from one of two fields
+        searchResult.put("TemporalResolution", "".equals(searchResult.get("Dataset-TemporalResolution")) ? searchResult.get("Dataset-TemporalRepeat") : searchResult.get("Dataset-TemporalResolution"));
+
+        // Special formatting for spatial resolution
+        String latResolution = (String) searchResult.get("Dataset-LatitudeResolution");
+        String lonResolution = (String) searchResult.get("Dataset-LongitudeResolution");
+        if (!latResolution.isEmpty() && !lonResolution.isEmpty()) {
+          searchResult.put("SpatialResolution", latResolution + " degrees (latitude) x " + lonResolution + " degrees (longitude)");
+        } else {
+          String acrossResolution = (String) searchResult.get("Dataset-AcrossTrackResolution");
+          String alonResolution = (String) searchResult.get("Dataset-AlongTrackResolution");
+          double dAcrossResolution = Double.parseDouble(acrossResolution) / 1000;
+          double dAlonResolution = Double.parseDouble(alonResolution) / 1000;
+          searchResult.put("SpatialResolution", dAlonResolution + " km (Along) x " + dAcrossResolution + " km (Across)");
+        }
+
+        // Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies.
+        List<List<String>> measurements = buildMeasurementHierarchies((List<String>) searchResult.get("Topic"), (List<String>) searchResult.get("DatasetParameter-Term-Full"),
+            (List<String>) searchResult.get("DatasetParameter-Variable-Full"), (List<String>) searchResult.get("DatasetParameter-VariableDetail"));
+
+        searchResult.put("Measurements", measurements);
+
+      }
+
+      searchResults.add(searchResult);
+    }
+
+    Map<String, List<?>> pdResults = new HashMap<>();
+    pdResults.put("PDResults", searchResults);
+
+    return new GsonBuilder().create().toJson(pdResults);
+  }
+
+  /**
+   * Builds a List of Measurement Hierarchies given the individual source lists.
+   * The hierarchy is built from the element in the same position from each input list in the order: Topic -> Term -> Variable -> VariableDetail
+   * "None" and blank strings are ignored. If, at any level, an element does not exist for that position or it is "None" or blank, that hierarchy is considered complete.
+   *
+   * For example, if the input is:
+   * <pre>
+   * topics = ["Oceans", "Oceans"]
+   * terms = ["Sea Surface Topography", "Ocean Waves"]
+   * variables = ["Sea Surface Height", "Significant Wave Height"]
+   * variableDetails = ["None", "None"]
+   * </pre>
+   *
+   * The output would be:
+   * <pre>
+   *   [
+   *     ["Oceans", "Sea Surface Topography", "Sea Surface Height"],
+   *     ["Oceans", "Ocean Waves", "Significant Wave Height"]
+   *   ]
+   * </pre>
+   *     Oceans > Sea Surface Topography > Sea Surface Height
+   *     Oceans > Ocean Waves > Significant Wave Height
+   *
+   * @param topics List of topics, the first element of a measurement
+   * @param terms List of terms, the second element of a measurement
+   * @param variables List of variables, the third element of a measurement
+   * @param variableDetails List of variable details, the fourth element of a measurement
+   *
+   * @return A List where each element is a single hierarchy (as a List) built from the provided input lists.
+   */
+  private List<List<String>> buildMeasurementHierarchies(List<String> topics, List<String> terms, List<String> variables, List<String> variableDetails) {
+
+    List<List<String>> measurements = new ArrayList<>();
+
+    for (int x = 0; x < topics.size(); x++) {
+      measurements.add(new ArrayList<>());
+      measurements.get(x).add(topics.get(x));
+      // Only add the next 'level' if we can
+      if (x < terms.size() && !"None".equalsIgnoreCase(terms.get(x)) && StringUtils.isNotBlank(terms.get(x))) {
+        measurements.get(x).add(terms.get(x));
+        if (x < variables.size() && !"None".equalsIgnoreCase(variables.get(x)) && StringUtils.isNotBlank(variables.get(x))) {
+          measurements.get(x).add(variables.get(x));
+          if (x < variableDetails.size() && !"None".equalsIgnoreCase(variableDetails.get(x)) && StringUtils.isNotBlank(variableDetails.get(x))) {
+            measurements.get(x).add(variableDetails.get(x));
+          }
+        }
+      }
+    }
+
+    return measurements;
+
+  }
+
+  public List<String> autoComplete(String index, String term) {
+    boolean exists = this.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
+    if (!exists) {
+      return new ArrayList<>();
+    }
+
+    Set<String> suggestHS = new HashSet<String>();
+    List<String> suggestList = new ArrayList<>();
+
+    // please make sure that the completion field is configured in the ES mapping
+    CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100);
+    SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder));
+    SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet();
+
+    Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator();
+
+    while (iterator.hasNext()) {
+      Suggest.Suggestion.Entry.Option next = iterator.next();
+      String suggest = next.getText().string().toLowerCase();
+      suggestList.add(suggest);
+    }
+
+    suggestHS.addAll(suggestList);
+    suggestList.clear();
+    suggestList.addAll(suggestHS);
+    return suggestList;
+  }
+
+  public void close() {
+    client.close();
+  }
+
+  public void refreshIndex() {
+    client.admin().indices().prepareRefresh().execute().actionGet();
+  }
+
+  /**
+   * Generates a TransportClient or NodeClient
+   *
+   * @param props a populated {@link java.util.Properties} object
+   * @return a constructed {@link org.elasticsearch.client.Client}
+   * @throws IOException if there is an error building the
+   *                     {@link org.elasticsearch.client.Client}
+   */
+  protected Client makeClient(Properties props) throws IOException {
+    String clusterName = props.getProperty(MudrodConstants.ES_CLUSTER);
+    String hostsString = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
+    String[] hosts = hostsString.split(",");
+    String portStr = props.getProperty(MudrodConstants.ES_TRANSPORT_TCP_PORT);
+    int port = Integer.parseInt(portStr);
+
+    Settings.Builder settingsBuilder = Settings.builder();
+
+    // Set the cluster name and build the settings
+    if (!clusterName.isEmpty())
+      settingsBuilder.put("cluster.name", clusterName);
+
+    settingsBuilder.put("http.type", "netty3");
+    settingsBuilder.put("transport.type", "netty3");
+
+    Settings settings = settingsBuilder.build();
+
+    Client client = null;
+
+    // Prefer TransportClient
+    if (hosts != null && port > 1) {
+      TransportClient transportClient = new ESTransportClient(settings);
+      for (String host : hosts)
+        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
+      client = transportClient;
+    } else if (clusterName != null) {
+      node = new Node(settings);
+      client = node.client();
+    }
+
+    return client;
+  }
+
+  /**
+   * Main method used to invoke the ESDriver implementation.
+   *
+   * @param args no arguments are required to invoke the Driver.
+   */
+  public static void main(String[] args) {
+    MudrodEngine mudrodEngine = new MudrodEngine();
+    ESDriver es = new ESDriver(mudrodEngine.loadConfig());
+    es.getTypeListWithPrefix("podaacsession", "sessionstats");
+  }
+
+  /**
+   * @return the client
+   */
+  public Client getClient() {
+    return client;
+  }
+
+  /**
+   * @param client the client to set
+   */
+  public void setClient(Client client) {
+    this.client = client;
+  }
+
+  /**
+   * @return the bulkProcessor
+   */
+  public BulkProcessor getBulkProcessor() {
+    return bulkProcessor;
+  }
+
+  /**
+   * @param bulkProcessor the bulkProcessor to set
+   */
+  public void setBulkProcessor(BulkProcessor bulkProcessor) {
+    this.bulkProcessor = bulkProcessor;
+  }
+
+  public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) {
+
+    UpdateRequest ur = null;
+    try {
+      ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject());
+    } catch (IOException e) {
+      LOG.error("Error whilst attempting to generate a new Update Request.", e);
+    }
+
+    return ur;
+  }
+
+  public UpdateRequest generateUpdateRequest(String index, String type, String id, Map<String, Object> filedValueMap) {
+
+    UpdateRequest ur = null;
+    try {
+      XContentBuilder builder = jsonBuilder().startObject();
+      for (Entry<String, Object> entry : filedValueMap.entrySet()) {
+        String key = entry.getKey();
+        builder.field(key, filedValueMap.get(key));
+      }
+      builder.endObject();
+      ur = new UpdateRequest(index, type, id).doc(builder);
+    } catch (IOException e) {
+      LOG.error("Error whilst attempting to generate a new Update Request.", e);
+    }
+
+    return ur;
+  }
+
+  public int getDocCount(String index, String... type) {
+    MatchAllQueryBuilder search = QueryBuilders.matchAllQuery();
+    String[] indexArr = new String[] { index };
+    return this.getDocCount(indexArr, type, search);
+  }
+
+  public int getDocCount(String[] index, String[] type) {
+    MatchAllQueryBuilder search = QueryBuilders.matchAllQuery();
+    return this.getDocCount(index, type, search);
+  }
+
+  public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch) {
+    SearchRequestBuilder countSrBuilder = getClient().prepareSearch(index).setTypes(type).setQuery(filterSearch).setSize(0);
+    SearchResponse countSr = countSrBuilder.execute().actionGet();
+    int docCount = (int) countSr.getHits().getTotalHits();
+    return docCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
new file mode 100644
index 0000000..e49c029
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/driver/SparkDriver.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.driver;
+
+import gov.nasa.jpl.mudrod.main.MudrodConstants;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.spark.sql.SQLContext;
+
+import java.io.File;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.util.Properties;
+//import org.apache.spark.sql.SparkSession;
+
+public class SparkDriver implements Serializable {
+
+  //TODO the commented out code below is the API uprgade
+  //for Spark 2.0.0. It requires a large upgrade and simplification
+  //across the mudrod codebase so should be done in an individual ticket.
+  //  /**
+  //   *
+  //   */
+  //  private static final long serialVersionUID = 1L;
+  //  private SparkSession builder;
+  //
+  //  public SparkDriver() {
+  //    builder = SparkSession.builder()
+  //        .master("local[2]")
+  //        .config("spark.hadoop.validateOutputSpecs", "false")
+  //        .config("spark.files.overwrite", "true")
+  //        .getOrCreate();
+  //  }
+  //
+  //  public SparkSession getBuilder() {
+  //    return builder;
+  //  }
+  //
+  //  public void setBuilder(SparkSession builder) {
+  //    this.builder = builder;
+  //  }
+  //
+  //  public void close() {
+  //    builder.stop();
+  //  }
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  public transient JavaSparkContext sc;
+  public transient SQLContext sqlContext;
+
+  public SparkDriver() {
+    // empty default constructor
+  }
+
+  public SparkDriver(Properties props) {
+    SparkConf conf = new SparkConf().setAppName(props.getProperty(MudrodConstants.SPARK_APP_NAME, "MudrodSparkApp")).setIfMissing("spark.master", props.getProperty(MudrodConstants.SPARK_MASTER))
+        .set("spark.hadoop.validateOutputSpecs", "false").set("spark.files.overwrite", "true");
+
+    String esHost = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS);
+    String esPort = props.getProperty(MudrodConstants.ES_HTTP_PORT);
+
+    if (!"".equals(esHost)) {
+      conf.set("es.nodes", esHost);
+    }
+
+    if (!"".equals(esPort)) {
+      conf.set("es.port", esPort);
+    }
+
+    conf.set("spark.serializer", KryoSerializer.class.getName());
+    conf.set("es.batch.size.entries", "1500");
+
+    sc = new JavaSparkContext(conf);
+    sqlContext = new SQLContext(sc);
+  }
+
+  public void close() {
+    sc.sc().stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
new file mode 100644
index 0000000..d9cde36
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/driver/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes commonly used Elasticsearch and Spark related
+ * functions
+ */
+package gov.nasa.jpl.mudrod.driver;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
new file mode 100644
index 0000000..14b667d
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/integration/LinkageIntegration.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package gov.nasa.jpl.mudrod.integration;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import gov.nasa.jpl.mudrod.discoveryengine.DiscoveryStepAbstract;
+import gov.nasa.jpl.mudrod.driver.ESDriver;
+import gov.nasa.jpl.mudrod.driver.SparkDriver;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.DecimalFormat;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * Supports ability to integrate vocab similarity results from metadata, ontology, and web logs.
+ */
+public class LinkageIntegration extends DiscoveryStepAbstract {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LinkageIntegration.class);
+  private static final long serialVersionUID = 1L;
+  transient List<LinkedTerm> termList = new ArrayList<>();
+  DecimalFormat df = new DecimalFormat("#.00");
+  private static final String INDEX_NAME = "indexName";
+  private static final String WEIGHT = "weight";
+
+  public LinkageIntegration(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  /**
+   * The data structure to store semantic triple.
+   */
+  class LinkedTerm {
+    String term = null;
+    double weight = 0;
+    String model = null;
+
+    public LinkedTerm(String str, double w, String m) {
+      term = str;
+      weight = w;
+      model = m;
+    }
+  }
+
+  /**
+   * Method of executing integration step
+   */
+  @Override
+  public Object execute() {
+    getIngeratedList("ocean wind", 11);
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  /**
+   * Method of getting integrated results
+   *
+   * @param input query string
+   * @return a hash map where the string is a related term, and double is the
+   * similarity to the input query
+   */
+  public Map<String, Double> appyMajorRule(String input) {
+    termList = new ArrayList<>();
+    Map<String, Double> termsMap = new HashMap<>();
+    Map<String, List<LinkedTerm>> map = new HashMap<>();
+    try {
+      map = aggregateRelatedTermsFromAllmodel(es.customAnalyzing(props.getProperty(INDEX_NAME), input));
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("Error applying majority rule", e);
+    }
+
+    for (Entry<String, List<LinkedTerm>> entry : map.entrySet()) {
+      List<LinkedTerm> list = entry.getValue();
+      double sumModelWeight = 0;
+      double tmp = 0;
+      for (LinkedTerm element : list) {
+        sumModelWeight += getModelweight(element.model);
+
+        if (element.weight > tmp) {
+          tmp = element.weight;
+        }
+      }
+
+      double finalWeight = tmp + ((sumModelWeight - 2) * 0.05);
+      if (finalWeight < 0) {
+        finalWeight = 0;
+      }
+
+      if (finalWeight > 1) {
+        finalWeight = 1;
+      }
+      termsMap.put(entry.getKey(), Double.parseDouble(df.format(finalWeight)));
+    }
+
+    return sortMapByValue(termsMap);
+  }
+
+  /**
+   * Method of getting integrated results
+   *
+   * @param input query string
+   * @param num   the number of most related terms
+   * @return a string of related terms along with corresponding similarities
+   */
+  public String getIngeratedList(String input, int num) {
+    String output = "";
+    Map<String, Double> sortedMap = appyMajorRule(input);
+    int count = 0;
+    for (Entry<String, Double> entry : sortedMap.entrySet()) {
+      if (count < num) {
+        output += entry.getKey() + " = " + entry.getValue() + ", ";
+      }
+      count++;
+    }
+    LOG.info("\n************************Integrated results***************************");
+    LOG.info(output);
+    return output;
+  }
+
+  /**
+   * Method of getting integrated results
+   *
+   * @param input query string
+   * @return a JSON object of related terms along with corresponding similarities
+   */
+  public JsonObject getIngeratedListInJson(String input) {
+    Map<String, Double> sortedMap = appyMajorRule(input);
+    int count = 0;
+    Map<String, Double> trimmedMap = new LinkedHashMap<>();
+    for (Entry<String, Double> entry : sortedMap.entrySet()) {
+      if (!entry.getKey().contains("china")) {
+        if (count < 10) {
+          trimmedMap.put(entry.getKey(), entry.getValue());
+        }
+        count++;
+      }
+    }
+
+    return mapToJson(trimmedMap);
+  }
+
+  /**
+   * Method of aggregating terms from web logs, metadata, and ontology
+   *
+   * @param input query string
+   * @return a hash map where the string is a related term, and the list is
+   * the similarities from different sources
+   */
+  public Map<String, List<LinkedTerm>> aggregateRelatedTermsFromAllmodel(String input) {
+    aggregateRelatedTerms(input, props.getProperty("userHistoryLinkageType"));
+    aggregateRelatedTerms(input, props.getProperty("clickStreamLinkageType"));
+    aggregateRelatedTerms(input, props.getProperty("metadataLinkageType"));
+    aggregateRelatedTermsSWEET(input, props.getProperty("ontologyLinkageType"));
+
+    return termList.stream().collect(Collectors.groupingBy(w -> w.term));
+  }
+
+  public int getModelweight(String model) {
+    if (model.equals(props.getProperty("userHistoryLinkageType"))) {
+      return Integer.parseInt(props.getProperty("userHistory_w"));
+    }
+
+    if (model.equals(props.getProperty("clickStreamLinkageType"))) {
+      return Integer.parseInt(props.getProperty("clickStream_w"));
+    }
+
+    if (model.equals(props.getProperty("metadataLinkageType"))) {
+      return Integer.parseInt(props.getProperty("metadata_w"));
+    }
+
+    if (model.equals(props.getProperty("ontologyLinkageType"))) {
+      return Integer.parseInt(props.getProperty("ontology_w"));
+    }
+
+    return 999999;
+  }
+
+  /**
+   * Method of extracting the related term from a comma string
+   *
+   * @param str   input string
+   * @param input query string
+   * @return related term contained in the input string
+   */
+  public String extractRelated(String str, String input) {
+    String[] strList = str.split(",");
+    if (input.equals(strList[0])) {
+      return strList[1];
+    } else {
+      return strList[0];
+    }
+  }
+
+  public void aggregateRelatedTerms(String input, String model) {
+    //get the first 10 related terms
+    SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("keywords", input)).addSort(WEIGHT, SortOrder.DESC).setSize(11)
+        .execute().actionGet();
+
+    LOG.info("\n************************ {} results***************************", model);
+    for (SearchHit hit : usrhis.getHits().getHits()) {
+      Map<String, Object> result = hit.getSource();
+      String keywords = (String) result.get("keywords");
+      String relatedKey = extractRelated(keywords, input);
+
+      if (!relatedKey.equals(input)) {
+        LinkedTerm lTerm = new LinkedTerm(relatedKey, (double) result.get(WEIGHT), model);
+        LOG.info("( {} {} )", relatedKey, (double) result.get(WEIGHT));
+        termList.add(lTerm);
+      }
+
+    }
+  }
+
+  /**
+   * Method of querying related terms from ontology
+   *
+   * @param input input query
+   * @param model source name
+   */
+  public void aggregateRelatedTermsSWEET(String input, String model) {
+    SearchResponse usrhis = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(model).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC)
+        .setSize(11).execute().actionGet();
+    LOG.info("\n************************ {} results***************************", model);
+    for (SearchHit hit : usrhis.getHits().getHits()) {
+      Map<String, Object> result = hit.getSource();
+      String conceptB = (String) result.get("concept_B");
+      if (!conceptB.equals(input)) {
+        LinkedTerm lTerm = new LinkedTerm(conceptB, (double) result.get(WEIGHT), model);
+        LOG.info("( {} {} )", conceptB, (double) result.get(WEIGHT));
+        termList.add(lTerm);
+      }
+    }
+  }
+
+  /**
+   * Method of sorting a map by value
+   *
+   * @param passedMap input map
+   * @return sorted map
+   */
+  public Map<String, Double> sortMapByValue(Map<String, Double> passedMap) {
+    List<String> mapKeys = new ArrayList<>(passedMap.keySet());
+    List<Double> mapValues = new ArrayList<>(passedMap.values());
+    Collections.sort(mapValues, Collections.reverseOrder());
+    Collections.sort(mapKeys, Collections.reverseOrder());
+
+    LinkedHashMap<String, Double> sortedMap = new LinkedHashMap<>();
+
+    Iterator<Double> valueIt = mapValues.iterator();
+    while (valueIt.hasNext()) {
+      Object val = valueIt.next();
+      Iterator<String> keyIt = mapKeys.iterator();
+
+      while (keyIt.hasNext()) {
+        Object key = keyIt.next();
+        String comp1 = passedMap.get(key).toString();
+        String comp2 = val.toString();
+
+        if (comp1.equals(comp2)) {
+          passedMap.remove(key);
+          mapKeys.remove(key);
+          sortedMap.put((String) key, (Double) val);
+          break;
+        }
+
+      }
+
+    }
+    return sortedMap;
+  }
+
+  /**
+   * Method of converting hashmap to JSON
+   *
+   * @param word        input query
+   * @param wordweights a map from related terms to weights
+   * @return converted JSON object
+   */
+  private JsonObject mapToJson(Map<String, Double> wordweights) {
+    Gson gson = new Gson();
+    JsonObject json = new JsonObject();
+    List<JsonObject> nodes = new ArrayList<>();
+
+    for (Entry<String, Double> entry : wordweights.entrySet()) {
+      JsonObject node = new JsonObject();
+      String key = entry.getKey();
+      Double value = entry.getValue();
+      node.addProperty("word", key);
+      node.addProperty("weight", value);
+      nodes.add(node);
+    }
+
+    JsonElement nodesElement = gson.toJsonTree(nodes);
+    json.add("ontology", nodesElement);
+
+    return json;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/7b76fa16/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java b/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
new file mode 100644
index 0000000..7f2ba66
--- /dev/null
+++ b/core/src/main/java/gov/nasa/jpl/mudrod/integration/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes integration method of web log, ontology, and metdata
+ * mining results.
+ */
+package gov.nasa.jpl.mudrod.integration;
\ No newline at end of file