You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/12/19 14:13:07 UTC

[06/17] incubator-sdap-mudrod git commit: SDAP-7 Change all package namespaces to org.apache.sdap

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
new file mode 100644
index 0000000..8259ce7
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/MatrixUtil.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.utils;
+
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.mllib.feature.IDF;
+import org.apache.spark.mllib.feature.IDFModel;
+import org.apache.spark.mllib.linalg.*;
+import org.apache.spark.mllib.linalg.distributed.IndexedRow;
+import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
+import org.apache.spark.mllib.linalg.distributed.RowMatrix;
+import scala.Tuple2;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * Matrix utility tool
+ */
+public class MatrixUtil {
+
+  private MatrixUtil() {
+  }
+
+  /**
+   * buildSVDMatrix: Generate SVD matrix from TF-IDF matrix. Please make sure
+   * the TF-IDF matrix has been already built from the original documents.
+   *
+   * @param tfidfMatrix,
+   *          each row is a term and each column is a document name and each
+   *          cell is the TF-IDF value of the term in the corresponding
+   *          document.
+   * @param dimension
+   *          Column number of the SVD matrix
+   * @return RowMatrix, each row is a term and each column is a dimension in the
+   *         feature space, each cell is value of the term in the corresponding
+   *         dimension.
+   */
+  public static RowMatrix buildSVDMatrix(RowMatrix tfidfMatrix, int dimension) {
+    int matrixCol = (int) tfidfMatrix.numCols();
+    if (matrixCol < dimension) {
+      dimension = matrixCol;
+    }
+
+    SingularValueDecomposition<RowMatrix, Matrix> svd = tfidfMatrix.computeSVD(dimension, true, 1.0E-9d);
+    RowMatrix u = svd.U();
+    Vector s = svd.s();
+    return u.multiply(Matrices.diag(s));
+  }
+
+  /**
+   * buildSVDMatrix: Generate SVD matrix from Vector RDD.
+   *
+   * @param vecRDD
+   *          vectors of terms in feature space
+   * @param dimension
+   *          Column number of the SVD matrix
+   * @return RowMatrix, each row is a term and each column is a dimension in the
+   *         feature space, each cell is value of the term in the corresponding
+   *         dimension.
+   */
+  public static RowMatrix buildSVDMatrix(JavaRDD<Vector> vecRDD, int dimension) {
+    RowMatrix tfidfMatrix = new RowMatrix(vecRDD.rdd());
+    SingularValueDecomposition<RowMatrix, Matrix> svd = tfidfMatrix.computeSVD(dimension, true, 1.0E-9d);
+    RowMatrix u = svd.U();
+    Vector s = svd.s();
+    return u.multiply(Matrices.diag(s));
+  }
+
+  /**
+   * Create TF-IDF matrix from word-doc matrix.
+   *
+   * @param wordDocMatrix,
+   *          each row is a term, each column is a document name and each cell
+   *          is number of the term in the corresponding document.
+   * @return RowMatrix, each row is a term and each column is a document name
+   *         and each cell is the TF-IDF value of the term in the corresponding
+   *         document.
+   */
+  public static RowMatrix createTFIDFMatrix(RowMatrix wordDocMatrix) {
+    JavaRDD<Vector> newcountRDD = wordDocMatrix.rows().toJavaRDD();
+    IDFModel idfModel = new IDF().fit(newcountRDD);
+    JavaRDD<Vector> idf = idfModel.transform(newcountRDD);
+    return new RowMatrix(idf.rdd());
+  }
+
+  /**
+   * Create matrix from doc-terms JavaPairRDD.
+   *
+   * @param uniqueDocRDD
+   *          doc-terms JavaPairRDD, in which each key is a doc name, and value
+   *          is term list extracted from that doc
+   * @return LabeledRowMatrix {@link LabeledRowMatrix}
+   */
+  public static LabeledRowMatrix createWordDocMatrix(JavaPairRDD<String, List<String>> uniqueDocRDD) {
+    // Index documents with unique IDs
+    JavaPairRDD<List<String>, Long> corpus = uniqueDocRDD.values().zipWithIndex();
+    // cal word-doc numbers
+    JavaPairRDD<Tuple2<String, Long>, Double> worddocNumRDD = corpus.flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Long>, Tuple2<String, Long>, Double>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Iterator<Tuple2<Tuple2<String, Long>, Double>> call(Tuple2<List<String>, Long> docwords) throws Exception {
+        List<Tuple2<Tuple2<String, Long>, Double>> pairs = new ArrayList<>();
+        List<String> words = docwords._1;
+        int n = words.size();
+        for (int i = 0; i < n; i++) {
+          Tuple2<String, Long> worddoc = new Tuple2<>(words.get(i), docwords._2);
+          pairs.add(new Tuple2<Tuple2<String, Long>, Double>(worddoc, 1.0));
+        }
+        return pairs.iterator();
+      }
+    }).reduceByKey(new Function2<Double, Double, Double>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Double call(Double first, Double second) throws Exception {
+        return first + second;
+      }
+    });
+    // cal word doc-numbers
+    JavaPairRDD<String, Tuple2<List<Long>, List<Double>>> wordDocnumRDD = worddocNumRDD
+        .mapToPair(new PairFunction<Tuple2<Tuple2<String, Long>, Double>, String, Tuple2<List<Long>, List<Double>>>() {
+          /**
+           *
+           */
+          private static final long serialVersionUID = 1L;
+
+          @Override
+          public Tuple2<String, Tuple2<List<Long>, List<Double>>> call(Tuple2<Tuple2<String, Long>, Double> worddocNum) throws Exception {
+            List<Long> docs = new ArrayList<>();
+            docs.add(worddocNum._1._2);
+            List<Double> nums = new ArrayList<>();
+            nums.add(worddocNum._2);
+            Tuple2<List<Long>, List<Double>> docmums = new Tuple2<>(docs, nums);
+            return new Tuple2<>(worddocNum._1._1, docmums);
+          }
+        });
+    // trans to vector
+    final int corporsize = (int) uniqueDocRDD.keys().count();
+    JavaPairRDD<String, Vector> wordVectorRDD = wordDocnumRDD.reduceByKey(new Function2<Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<List<Long>, List<Double>> call(Tuple2<List<Long>, List<Double>> arg0, Tuple2<List<Long>, List<Double>> arg1) throws Exception {
+        arg0._1.addAll(arg1._1);
+        arg0._2.addAll(arg1._2);
+        return new Tuple2<>(arg0._1, arg0._2);
+      }
+    }).mapToPair(new PairFunction<Tuple2<String, Tuple2<List<Long>, List<Double>>>, String, Vector>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, Vector> call(Tuple2<String, Tuple2<List<Long>, List<Double>>> arg0) throws Exception {
+        int docsize = arg0._2._1.size();
+        int[] intArray = new int[docsize];
+        double[] doubleArray = new double[docsize];
+        for (int i = 0; i < docsize; i++) {
+          intArray[i] = arg0._2._1.get(i).intValue();
+          doubleArray[i] = arg0._2._2.get(i).intValue();
+        }
+        Vector sv = Vectors.sparse(corporsize, intArray, doubleArray);
+        return new Tuple2<>(arg0._1, sv);
+      }
+    });
+
+    RowMatrix wordDocMatrix = new RowMatrix(wordVectorRDD.values().rdd());
+
+    LabeledRowMatrix labeledRowMatrix = new LabeledRowMatrix();
+    labeledRowMatrix.rowMatrix = wordDocMatrix;
+    labeledRowMatrix.rowkeys = wordVectorRDD.keys().collect();
+    labeledRowMatrix.colkeys = uniqueDocRDD.keys().collect();
+    return labeledRowMatrix;
+  }
+
+  public static LabeledRowMatrix createDocWordMatrix(JavaPairRDD<String, List<String>> uniqueDocRDD, JavaSparkContext sc) {
+    // Index word with unique IDs
+    JavaPairRDD<String, Long> wordIDRDD = uniqueDocRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Iterator<String> call(List<String> arg0) throws Exception {
+        return arg0.iterator();
+      }
+    }).distinct().zipWithIndex();
+
+    //
+    JavaPairRDD<Tuple2<String, String>, Double> docwordNumRDD = uniqueDocRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, List<String>>, Tuple2<String, String>, Double>() {
+
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Iterator<Tuple2<Tuple2<String, String>, Double>> call(Tuple2<String, List<String>> docwords) throws Exception {
+        List<Tuple2<Tuple2<String, String>, Double>> pairs = new ArrayList<>();
+        List<String> words = docwords._2;
+        int n = words.size();
+        for (int i = 0; i < n; i++) {
+          Tuple2<String, String> worddoc = new Tuple2<>(docwords._1, words.get(i));
+          pairs.add(new Tuple2<Tuple2<String, String>, Double>(worddoc, 1.0));
+        }
+        return pairs.iterator();
+      }
+    }).reduceByKey(new Function2<Double, Double, Double>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Double call(Double first, Double second) throws Exception {
+        return first + second;
+      }
+    });
+
+    //
+    JavaPairRDD<String, Tuple2<String, Double>> wordDocnumRDD = docwordNumRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Double>, String, Tuple2<String, Double>>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, Tuple2<String, Double>> call(Tuple2<Tuple2<String, String>, Double> arg0) throws Exception {
+
+        Tuple2<String, Double> wordmums = new Tuple2<>(arg0._1._1, arg0._2);
+        return new Tuple2<>(arg0._1._2, wordmums);
+      }
+    });
+
+    //
+
+    JavaPairRDD<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> testRDD = wordDocnumRDD.leftOuterJoin(wordIDRDD);
+
+    int wordsize = (int) wordIDRDD.count();
+    JavaPairRDD<String, Vector> docVectorRDD = testRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>>, String, Tuple2<List<Long>, List<Double>>>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, Tuple2<List<Long>, List<Double>>> call(Tuple2<String, Tuple2<Tuple2<String, Double>, Optional<Long>>> arg0) throws Exception {
+        Optional<Long> oid = arg0._2._2;
+        Long wordId = (long) 0;
+        if (oid.isPresent()) {
+          wordId = oid.get();
+        }
+
+        List<Long> word = new ArrayList<>();
+        word.add(wordId);
+
+        List<Double> count = new ArrayList<>();
+        count.add(arg0._2._1._2);
+
+        Tuple2<List<Long>, List<Double>> wordcount = new Tuple2<>(word, count);
+
+        return new Tuple2<>(arg0._2._1._1, wordcount);
+      }
+
+    }).reduceByKey(new Function2<Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>, Tuple2<List<Long>, List<Double>>>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<List<Long>, List<Double>> call(Tuple2<List<Long>, List<Double>> arg0, Tuple2<List<Long>, List<Double>> arg1) throws Exception {
+        arg0._1.addAll(arg1._1);
+        arg0._2.addAll(arg1._2);
+        return new Tuple2<>(arg0._1, arg0._2);
+      }
+    }).mapToPair(new PairFunction<Tuple2<String, Tuple2<List<Long>, List<Double>>>, String, Vector>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, Vector> call(Tuple2<String, Tuple2<List<Long>, List<Double>>> arg0) throws Exception {
+        int docsize = arg0._2._1.size();
+        int[] intArray = new int[docsize];
+        double[] doubleArray = new double[docsize];
+        for (int i = 0; i < docsize; i++) {
+          intArray[i] = arg0._2._1.get(i).intValue();
+          doubleArray[i] = arg0._2._2.get(i).intValue();
+        }
+        Vector sv = Vectors.sparse(wordsize, intArray, doubleArray);
+        return new Tuple2<>(arg0._1, sv);
+      }
+    });
+
+    RowMatrix docwordMatrix = new RowMatrix(docVectorRDD.values().rdd());
+
+    LabeledRowMatrix labeledRowMatrix = new LabeledRowMatrix();
+    labeledRowMatrix.rowMatrix = docwordMatrix;
+    labeledRowMatrix.rowkeys = docVectorRDD.keys().collect();
+    labeledRowMatrix.colkeys = wordIDRDD.keys().collect();
+
+    return labeledRowMatrix;
+  }
+
+  /**
+   * loadVectorFromCSV: Load term vector from csv file.
+   *
+   * @param spark
+   *          spark instance
+   * @param csvFileName
+   *          csv matrix file
+   * @param skipNum
+   *          the numbers of rows which should be skipped Ignore the top skip
+   *          number rows of the csv file
+   * @return JavaPairRDD, each key is a term, and value is the vector of the
+   *         term in feature space.
+   */
+  public static JavaPairRDD<String, Vector> loadVectorFromCSV(SparkDriver spark, String csvFileName, int skipNum) {
+    // skip the first line (header), important!
+    JavaRDD<String> importRDD = spark.sc.textFile(csvFileName);
+    JavaPairRDD<String, Long> importIdRDD = importRDD.zipWithIndex().filter(new Function<Tuple2<String, Long>, Boolean>() {
+      /** */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Boolean call(Tuple2<String, Long> v1) throws Exception {
+        if (v1._2 > (skipNum - 1)) {
+          return true;
+        }
+        return false;
+      }
+    });
+
+    if (importIdRDD.count() == 0) {
+      return null;
+    }
+
+    return importIdRDD.mapToPair(new PairFunction<Tuple2<String, Long>, String, Vector>() {
+      /** */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<String, Vector> call(Tuple2<String, Long> t) throws Exception {
+        String[] fields = t._1.split(",");
+        String word = fields[0];
+        int fieldsize = fields.length;
+        int nStart = 1;
+        int nEnd = fieldsize - 1;
+        if (fieldsize < 2) {
+          nStart = 0;
+          nEnd = 0;
+        }
+        String[] numfields = Arrays.copyOfRange(fields, nStart, nEnd);
+
+        double[] nums = Stream.of(numfields).mapToDouble(Double::parseDouble).toArray();
+        Vector vec = Vectors.dense(nums);
+        return new Tuple2<>(word, vec);
+      }
+    });
+  }
+
+  /**
+   * Convert vectorRDD to indexed row matrix.
+   *
+   * @param vecs
+   *          Vector RDD
+   * @return IndexedRowMatrix
+   */
+  public static IndexedRowMatrix buildIndexRowMatrix(JavaRDD<Vector> vecs) {
+    JavaRDD<IndexedRow> indexrows = vecs.zipWithIndex().map(new Function<Tuple2<Vector, Long>, IndexedRow>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public IndexedRow call(Tuple2<Vector, Long> docId) {
+        return new IndexedRow(docId._2, docId._1);
+      }
+    });
+    return new IndexedRowMatrix(indexrows.rdd());
+  }
+
+  /**
+   * Transpose matrix
+   *
+   * @param indexedMatrix
+   *          spark indexed matrix
+   * @return rowmatrix, each row is corresponding to the column in the original
+   *         matrix and vice versa
+   */
+  public static RowMatrix transposeMatrix(IndexedRowMatrix indexedMatrix) {
+    return indexedMatrix.toCoordinateMatrix().transpose().toRowMatrix();
+  }
+
+  /**
+   * Output matrix to a CSV file.
+   *
+   * @param matrix
+   *          spark row matrix
+   * @param rowKeys
+   *          matrix row names
+   * @param colKeys
+   *          matrix coloum names
+   * @param fileName
+   *          csv file name
+   */
+  public static void exportToCSV(RowMatrix matrix, List<String> rowKeys, List<String> colKeys, String fileName) {
+
+    if (matrix.rows().isEmpty()) {
+      return;
+    }
+
+    int rownum = (int) matrix.numRows();
+    int colnum = (int) matrix.numCols();
+    List<Vector> rows = matrix.rows().toJavaRDD().collect();
+
+    File file = new File(fileName);
+    if (file.exists()) {
+      file.delete();
+    }
+    try {
+      file.createNewFile();
+      FileWriter fw = new FileWriter(file.getAbsoluteFile());
+      BufferedWriter bw = new BufferedWriter(fw);
+      String coltitle = " Num" + ",";
+      for (int j = 0; j < colnum; j++) {
+        coltitle += "\"" + colKeys.get(j) + "\",";
+      }
+      coltitle = coltitle.substring(0, coltitle.length() - 1);
+      bw.write(coltitle + "\n");
+
+      for (int i = 0; i < rownum; i++) {
+        double[] rowvlaue = rows.get(i).toArray();
+        String row = rowKeys.get(i) + ",";
+        for (int j = 0; j < colnum; j++) {
+          row += rowvlaue[j] + ",";
+        }
+        row = row.substring(0, row.length() - 1);
+        bw.write(row + "\n");
+      }
+
+      bw.close();
+
+    } catch (IOException e) {
+      e.printStackTrace();
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/RDDUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/RDDUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/RDDUtil.java
new file mode 100644
index 0000000..8c2e64c
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/RDDUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.utils;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * ClassName: RDDUtil Function: Mudrod Spark RDD common methods
+ */
+public class RDDUtil {
+
+  public RDDUtil() {
+  }
+
+  /**
+   * getAllWordsInDoc: Extracted all unique terms from all docs.
+   *
+   * @param docwordRDD Pair RDD, each key is a doc, and value is term list extracted from
+   *                   that doc.
+   * @return unique term list
+   */
+  public static JavaRDD<String> getAllWordsInDoc(JavaPairRDD<String, List<String>> docwordRDD) {
+    JavaRDD<String> wordRDD = docwordRDD.values().flatMap(new FlatMapFunction<List<String>, String>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Iterator<String> call(List<String> list) {
+        return list.iterator();
+      }
+    }).distinct();
+
+    return wordRDD;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/SVDUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/SVDUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/SVDUtil.java
new file mode 100644
index 0000000..1cd4e00
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/SVDUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.utils;
+
+import org.apache.sdap.mudrod.discoveryengine.MudrodAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
+import org.apache.spark.mllib.linalg.distributed.RowMatrix;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Singular value decomposition
+ */
+public class SVDUtil extends MudrodAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  // wordRDD: terms extracted from all documents
+  JavaRDD<String> wordRDD;
+  // svdMatrix: svd matrix
+  private RowMatrix svdMatrix;
+  // simMatrix: similarity matrix
+  private CoordinateMatrix simMatrix;
+
+  /**
+   * Creates a new instance of SVDUtil.
+   *
+   * @param config the Mudrod configuration
+   * @param es     the Elasticsearch drive
+   * @param spark  the spark driver
+   */
+  public SVDUtil(Properties config, ESDriver es, SparkDriver spark) {
+    super(config, es, spark);
+  }
+
+  /**
+   * Build SVD matrix from docment-terms pairs.
+   *
+   * @param docwordRDD    JavaPairRDD, key is short name of data set and values are terms in
+   *                      the corresponding data set
+   * @param svdDimension: Dimension of matrix after singular value decomposition
+   * @return row matrix
+   */
+  public RowMatrix buildSVDMatrix(JavaPairRDD<String, List<String>> docwordRDD, int svdDimension) {
+
+    RowMatrix svdMatrix = null;
+    LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(docwordRDD);
+    RowMatrix ifIdfMatrix = MatrixUtil.createTFIDFMatrix(wordDocMatrix.rowMatrix);
+    svdMatrix = MatrixUtil.buildSVDMatrix(ifIdfMatrix, svdDimension);
+    this.svdMatrix = svdMatrix;
+    this.wordRDD = RDDUtil.getAllWordsInDoc(docwordRDD);
+    return svdMatrix;
+  }
+
+  /**
+   * Build svd matrix from CSV file.
+   *
+   * @param tfidfCSVfile  tf-idf matrix csv file
+   * @param svdDimension: Dimension of matrix after singular value decomposition
+   * @return row matrix
+   */
+  public RowMatrix buildSVDMatrix(String tfidfCSVfile, int svdDimension) {
+    RowMatrix svdMatrix = null;
+    JavaPairRDD<String, Vector> tfidfRDD = MatrixUtil.loadVectorFromCSV(spark, tfidfCSVfile, 2);
+    JavaRDD<Vector> vectorRDD = tfidfRDD.values();
+
+    svdMatrix = MatrixUtil.buildSVDMatrix(vectorRDD, svdDimension);
+    this.svdMatrix = svdMatrix;
+
+    this.wordRDD = tfidfRDD.keys();
+
+    return svdMatrix;
+  }
+
+  /**
+   * Calculate similarity
+   */
+  public void calSimilarity() {
+    CoordinateMatrix simMatrix = SimilarityUtil.calculateSimilarityFromMatrix(svdMatrix);
+    this.simMatrix = simMatrix;
+  }
+
+  /**
+   * Insert linkage triples to elasticsearch
+   *
+   * @param index index name
+   * @param type  linkage triple name
+   */
+  public void insertLinkageToES(String index, String type) {
+    List<LinkageTriple> triples = SimilarityUtil.matrixToTriples(wordRDD, simMatrix);
+    try {
+      LinkageTriple.insertTriples(es, triples, index, type);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java b/core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java
new file mode 100644
index 0000000..6fdc66d
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/SimilarityUtil.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.utils;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
+import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
+import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
+import org.apache.spark.mllib.linalg.distributed.RowMatrix;
+import scala.Tuple2;
+
+import java.util.List;
+
+/**
+ * Similarity and distrance calculation utilities
+ */
+public class SimilarityUtil {
+
+  public static final int SIM_COSINE = 3;
+  public static final int SIM_HELLINGER = 2;
+  public static final int SIM_PEARSON = 1;
+  /**
+   * CalSimilarityFromMatrix: Calculate term similarity from matrix.
+   *
+   * @param svdMatrix. Each row is corresponding to a term, and each column is
+   *                   corresponding to a dimension of feature
+   * @return CoordinateMatrix, each row is corresponding to a term, and each
+   * column is also a term, the cell value is the similarity between the
+   * two terms
+   */
+  public static CoordinateMatrix calculateSimilarityFromMatrix(RowMatrix svdMatrix) {
+    JavaRDD<Vector> vecs = svdMatrix.rows().toJavaRDD();
+    return SimilarityUtil.calculateSimilarityFromVector(vecs);
+  }
+
+  /**
+   * CalSimilarityFromVector:Calculate term similarity from vector.
+   *
+   * @param vecs Each vector is corresponding to a term in the feature space.
+   * @return CoordinateMatrix, each row is corresponding to a term, and each
+   * column is also a term, the cell value is the similarity between the
+   * two terms
+   */
+  public static CoordinateMatrix calculateSimilarityFromVector(JavaRDD<Vector> vecs) {
+    IndexedRowMatrix indexedMatrix = MatrixUtil.buildIndexRowMatrix(vecs);
+    RowMatrix transposeMatrix = MatrixUtil.transposeMatrix(indexedMatrix);
+    return transposeMatrix.columnSimilarities();
+  }
+
+  /**
+   * Calculate term similarity from vector.
+   *
+   * @param importRDD the {@link org.apache.spark.api.java.JavaPairRDD}
+   *                  data structure containing the vectors.
+   * @param simType   the similarity calculation to execute e.g. 
+   * <ul>
+   * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_COSINE} - 3,</li>
+   * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_HELLINGER} - 2,</li>
+   * <li>{@link org.apache.sdap.mudrod.utils.SimilarityUtil#SIM_PEARSON} - 1</li>
+   * </ul>
+   * @return a new {@link org.apache.spark.api.java.JavaPairRDD}
+   */
+  public static JavaRDD<LinkageTriple> calculateSimilarityFromVector(JavaPairRDD<String, Vector> importRDD, int simType) {
+    JavaRDD<Tuple2<String, Vector>> importRDD1 = importRDD.map(f -> new Tuple2<String, Vector>(f._1, f._2));
+    JavaPairRDD<Tuple2<String, Vector>, Tuple2<String, Vector>> cartesianRDD = importRDD1.cartesian(importRDD1);
+
+    return cartesianRDD.map(new Function<Tuple2<Tuple2<String, Vector>, Tuple2<String, Vector>>, LinkageTriple>() {
+
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public LinkageTriple call(Tuple2<Tuple2<String, Vector>, Tuple2<String, Vector>> arg) {
+        String keyA = arg._1._1;
+        String keyB = arg._2._1;
+
+        if (keyA.equals(keyB)) {
+          return null;
+        }
+
+        Vector vecA = arg._1._2;
+        Vector vecB = arg._2._2;
+        Double weight = 0.0;
+
+        if (simType == SimilarityUtil.SIM_PEARSON) {
+          weight = SimilarityUtil.pearsonDistance(vecA, vecB);
+        } else if (simType == SimilarityUtil.SIM_HELLINGER) {
+          weight = SimilarityUtil.hellingerDistance(vecA, vecB);
+        }
+
+        LinkageTriple triple = new LinkageTriple();
+        triple.keyA = keyA;
+        triple.keyB = keyB;
+        triple.weight = weight;
+        return triple;
+      }
+    }).filter(new Function<LinkageTriple, Boolean>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Boolean call(LinkageTriple arg0) throws Exception {
+        if (arg0 == null) {
+          return false;
+        }
+        return true;
+      }
+    });
+  }
+
+  /**
+   * MatrixtoTriples:Convert term similarity matrix to linkage triple list.
+   *
+   * @param keys      each key is a term
+   * @param simMatirx term similarity matrix, in which each row and column is a term and
+   *                  the cell value is the similarity between the two terms
+   * @return linkage triple list
+   */
+  public static List<LinkageTriple> matrixToTriples(JavaRDD<String> keys, CoordinateMatrix simMatirx) {
+    if (simMatirx.numCols() != keys.count()) {
+      return null;
+    }
+
+    // index words
+    JavaPairRDD<Long, String> keyIdRDD = JavaPairRDD.fromJavaRDD(keys.zipWithIndex().map(new Function<Tuple2<String, Long>, Tuple2<Long, String>>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<Long, String> call(Tuple2<String, Long> docId) {
+        return docId.swap();
+      }
+    }));
+
+    JavaPairRDD<Long, LinkageTriple> entriesRowRDD = simMatirx.entries().toJavaRDD().mapToPair(new PairFunction<MatrixEntry, Long, LinkageTriple>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<Long, LinkageTriple> call(MatrixEntry t) throws Exception {
+        LinkageTriple triple = new LinkageTriple();
+        triple.keyAId = t.i();
+        triple.keyBId = t.j();
+        triple.weight = t.value();
+        return new Tuple2<>(triple.keyAId, triple);
+      }
+    });
+
+    JavaPairRDD<Long, LinkageTriple> entriesColRDD = entriesRowRDD.leftOuterJoin(keyIdRDD).values().mapToPair(new PairFunction<Tuple2<LinkageTriple, Optional<String>>, Long, LinkageTriple>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Tuple2<Long, LinkageTriple> call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception {
+        LinkageTriple triple = t._1;
+        Optional<String> stra = t._2;
+        if (stra.isPresent()) {
+          triple.keyA = stra.get();
+        }
+        return new Tuple2<>(triple.keyBId, triple);
+      }
+    });
+
+    JavaRDD<LinkageTriple> tripleRDD = entriesColRDD.leftOuterJoin(keyIdRDD).values().map(new Function<Tuple2<LinkageTriple, Optional<String>>, LinkageTriple>() {
+      /**
+       *
+       */
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public LinkageTriple call(Tuple2<LinkageTriple, Optional<String>> t) throws Exception {
+        LinkageTriple triple = t._1;
+        Optional<String> strb = t._2;
+        if (strb.isPresent()) {
+          triple.keyB = strb.get();
+        }
+        return triple;
+      }
+    });
+    return tripleRDD.collect();
+  }
+
+  /**
+   * Calculate similarity (Hellinger Distance) between vectors
+   *
+   * @param vecA initial vector from which to calculate a similarity
+   * @param vecB second vector involved in similarity calculation
+   * @return similarity between two vectors
+   */
+  public static double hellingerDistance(Vector vecA, Vector vecB) {
+    double[] arrA = vecA.toArray();
+    double[] arrB = vecB.toArray();
+
+    double sim = 0.0;
+
+    int arrsize = arrA.length;
+    for (int i = 0; i < arrsize; i++) {
+      double a = arrA[i];
+      double b = arrB[i];
+      double sqrtDiff = Math.sqrt(a) - Math.sqrt(b);
+      sim += sqrtDiff * sqrtDiff;
+    }
+
+    sim = sim / Math.sqrt(2);
+
+    return sim;
+  }
+
+  /**
+   * Calculate similarity (Pearson Distance) between vectors
+   *
+   * @param vecA initial vector from which to calculate a similarity
+   * @param vecB second vector involved in similarity calculation
+   * @return similarity between two vectors
+   */
+  public static double pearsonDistance(Vector vecA, Vector vecB) {
+    double[] arrA = vecA.toArray();
+    double[] arrB = vecB.toArray();
+
+    int viewA = 0;
+    int viewB = 0;
+    int viewAB = 0;
+
+    int arrsize = arrA.length;
+    for (int i = 0; i < arrsize; i++) {
+      if (arrA[i] > 0) {
+        viewA++;
+      }
+
+      if (arrB[i] > 0) {
+        viewB++;
+      }
+
+      if (arrB[i] > 0 && arrA[i] > 0) {
+        viewAB++;
+      }
+    }
+    return viewAB / (Math.sqrt(viewA) * Math.sqrt(viewB));
+  }
+
+  /**
+   * calculate similarity between vectors
+   *
+   * @param vecA initial vector from which to calculate a similarity
+   * @param vecB second vector involved in similarity calculation
+   * @return similarity between two vectors
+   */
+  public static double cosineDistance(Vector vecA, Vector vecB) {
+    return 1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/utils/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/utils/package-info.java
new file mode 100644
index 0000000..1adb0b9
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/utils/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes utilities classes for calculating similarity and
+ * parsing HTTP request
+ */
+package org.apache.sdap.mudrod.utils;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/package-info.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/package-info.java
new file mode 100644
index 0000000..9c87a7d
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package includes web log pre-processing, processing, and data structure
+ * classes.
+ */
+package org.apache.sdap.mudrod.weblog;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/KGreedyPartitionSolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/KGreedyPartitionSolver.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/KGreedyPartitionSolver.java
new file mode 100644
index 0000000..8f4e263
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/KGreedyPartitionSolver.java
@@ -0,0 +1,142 @@
+package org.apache.sdap.mudrod.weblog.partition;
+
+import java.util.*;
+
+public class KGreedyPartitionSolver implements ThePartitionProblemSolver {
+
+  public boolean bsorted = false;
+
+  public KGreedyPartitionSolver() {
+    // default constructor
+  }
+
+  public KGreedyPartitionSolver(boolean bsorted) {
+    this.bsorted = true;
+  }
+
+  @Override
+  public Map<String, Integer> solve(Map<String, Double> labelNums, int k) {
+    List<Double> lista = null;
+    List<String> months = null;
+
+    if (!this.bsorted) {
+      LinkedHashMap sortedMap = this.sortMapByValue(labelNums);
+      lista = new ArrayList(sortedMap.values());
+      months = new ArrayList(sortedMap.keySet());
+    } else {
+      lista = new ArrayList(labelNums.values());
+      months = new ArrayList(labelNums.keySet());
+    }
+
+    List<List<Double>> parts = new ArrayList<>();
+    List<List<String>> splitMonths = new ArrayList<>();
+
+    for (int i = 0; i < k; i++) {
+      List<Double> part = new ArrayList();
+      parts.add(part);
+
+      List<String> monthList = new ArrayList();
+      splitMonths.add(monthList);
+    }
+
+    int j = 0;
+    for (Double lista1 : lista) {
+
+      Double minimalSum = -1.0;
+      int position = 0;
+      for (int i = 0; i < parts.size(); i++) {
+        List<Double> part = parts.get(i);
+        if (minimalSum == -1) {
+          minimalSum = Suma(part);
+          position = i;
+        } else if (Suma(part) < minimalSum) {
+          minimalSum = Suma(part);
+          position = i;
+        }
+      }
+
+      List<Double> part = parts.get(position);
+      part.add(lista1);
+      parts.set(position, part);
+
+      List<String> month = splitMonths.get(position);
+      month.add(months.get(j));
+      splitMonths.set(position, month);
+      j++;
+    }
+
+    /*  for(int i=0; i<splitMonths.size(); i++){
+        System.out.println("group:" + i);
+        printStrList(splitMonths.get(i));
+      }
+      
+      for(int i=0; i<parts.size(); i++){
+        print(parts.get(i));
+      }*/
+
+    Map<String, Integer> LabelGroups = new HashMap<String, Integer>();
+    for (int i = 0; i < splitMonths.size(); i++) {
+      List<String> list = splitMonths.get(i);
+      for (int m = 0; m < list.size(); m++) {
+        LabelGroups.put(list.get(m), i);
+      }
+    }
+
+    return LabelGroups;
+  }
+
+  public LinkedHashMap<String, Double> sortMapByValue(Map passedMap) {
+    List mapKeys = new ArrayList(passedMap.keySet());
+    List mapValues = new ArrayList(passedMap.values());
+    Collections.sort(mapValues, Collections.reverseOrder());
+    Collections.sort(mapKeys, Collections.reverseOrder());
+
+    LinkedHashMap sortedMap = new LinkedHashMap();
+
+    Iterator valueIt = mapValues.iterator();
+    while (valueIt.hasNext()) {
+      Object val = valueIt.next();
+      Iterator keyIt = mapKeys.iterator();
+
+      while (keyIt.hasNext()) {
+        Object key = keyIt.next();
+        String comp1 = passedMap.get(key).toString();
+        String comp2 = val.toString();
+
+        if (comp1.equals(comp2)) {
+          passedMap.remove(key);
+          mapKeys.remove(key);
+          sortedMap.put((String) key, (Double) val);
+          break;
+        }
+
+      }
+
+    }
+    return sortedMap;
+  }
+
+  private Double Suma(List<Double> part) {
+    Double ret = 0.0;
+    for (int i = 0; i < part.size(); i++) {
+      ret += part.get(i);
+    }
+    return ret;
+  }
+
+  private void print(List<Double> list) {
+    /*for (int i = 0; i < list.size(); i++) {
+        System.out.print(list.get(i)+",");
+    }*/
+    System.out.print("sum is:" + Suma(list));
+    System.out.println();
+  }
+
+  private void printStrList(List<String> list) {
+    for (int i = 0; i < list.size(); i++) {
+      System.out.print(list.get(i) + ",");
+    }
+    System.out.println();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/ThePartitionProblemSolver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/ThePartitionProblemSolver.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/ThePartitionProblemSolver.java
new file mode 100644
index 0000000..507140f
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/ThePartitionProblemSolver.java
@@ -0,0 +1,8 @@
+package org.apache.sdap.mudrod.weblog.partition;
+
+import java.util.Map;
+
+public interface ThePartitionProblemSolver {
+
+  public Map<String, Integer> solve(Map<String, Double> labelNums, int k);
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/logPartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/logPartitioner.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/logPartitioner.java
new file mode 100644
index 0000000..7ff2181
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/partition/logPartitioner.java
@@ -0,0 +1,33 @@
+package org.apache.sdap.mudrod.weblog.partition;
+
+import org.apache.spark.Partitioner;
+
+import java.util.Map;
+
+public class logPartitioner extends Partitioner {
+
+  int num;
+  Map<String, Integer> UserGroups;
+
+  public logPartitioner(int num) {
+    this.num = num;
+  }
+
+  public logPartitioner(Map<String, Integer> UserGroups, int num) {
+    this.UserGroups = UserGroups;
+    this.num = num;
+  }
+
+  @Override
+  public int getPartition(Object arg0) {
+    // TODO Auto-generated method stub
+    String user = (String) arg0;
+    return UserGroups.get(user);
+  }
+
+  @Override
+  public int numPartitions() {
+    // TODO Auto-generated method stub
+    return num;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java
new file mode 100644
index 0000000..e678854
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ClickStreamGenerator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.utils.LabeledRowMatrix;
+import org.apache.sdap.mudrod.utils.MatrixUtil;
+import org.apache.sdap.mudrod.weblog.structure.ClickStream;
+import org.apache.sdap.mudrod.weblog.structure.SessionExtractor;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Supports ability to extract click stream data based on session processing results
+ */
+public class ClickStreamGenerator extends DiscoveryStepAbstract {
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(ClickStreamGenerator.class);
+
+  public ClickStreamGenerator(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Starting ClickStreamGenerator...");
+    startTime = System.currentTimeMillis();
+
+    String clickstremMatrixFile = props.getProperty("clickstreamMatrix");
+    try {
+      SessionExtractor extractor = new SessionExtractor();
+      JavaRDD<ClickStream> clickstreamRDD = extractor.extractClickStreamFromES(this.props, this.es, this.spark);
+      int weight = Integer.parseInt(props.getProperty("downloadWeight"));
+      JavaPairRDD<String, List<String>> metaddataQueryRDD = extractor.bulidDataQueryRDD(clickstreamRDD, weight);
+      LabeledRowMatrix wordDocMatrix = MatrixUtil.createWordDocMatrix(metaddataQueryRDD);
+
+      MatrixUtil.exportToCSV(wordDocMatrix.rowMatrix, wordDocMatrix.rowkeys, wordDocMatrix.colkeys, clickstremMatrixFile);
+    } catch (Exception e) {
+      LOG.error("Encountered error within ClickStreamGenerator: {}", e);
+    }
+
+    endTime = System.currentTimeMillis();
+    LOG.info("ClickStreamGenerator complete. Time elapsed {} seconds.", (endTime - startTime) / 1000);
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
new file mode 100644
index 0000000..79a014e
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/CrawlerDetection.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An {@link DiscoveryStepAbstract}
+ * implementation which detects a known list of Web crawlers which may may be
+ * present within, and pollute various logs acting as input to Mudrod.
+ */
+public class CrawlerDetection extends LogAbstract {
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(CrawlerDetection.class);
+
+  public static final String CRAWLER = "crawler";
+  public static final String GOOGLE_BOT = "googlebot";
+  public static final String BING_BOT = "bingbot";
+  public static final String YAHOO_BOT = "slurp";
+  public static final String YACY_BOT = "yacybot";
+  public static final String ROGER_BOT = "rogerbot";
+  public static final String YANDEX_BOT = "yandexbot";
+
+  public static final String NO_AGENT_BOT = "-";
+  public static final String PERL_BOT = "libwww-perl/";
+  public static final String APACHE_HHTP = "apache-httpclient/";
+  public static final String JAVA_CLIENT = "java/";
+  public static final String CURL = "curl/";
+
+  /**
+   * Paramterized constructor to instantiate a configured instance of
+   * {@link CrawlerDetection}
+   *
+   * @param props populated {@link java.util.Properties} object
+   * @param es    {@link ESDriver} object to use in
+   *              crawler detection preprocessing.
+   * @param spark {@link SparkDriver} object to use in
+   *              crawler detection preprocessing.
+   */
+  public CrawlerDetection(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  public CrawlerDetection() {
+    super(null, null, null);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Starting Crawler detection {}.", httpType);
+    startTime = System.currentTimeMillis();
+    try {
+      checkByRate();
+    } catch (InterruptedException | IOException e) {
+      LOG.error("Encountered an error whilst detecting Web crawlers.", e);
+    }
+    endTime = System.currentTimeMillis();
+    es.refreshIndex();
+    LOG.info("Crawler detection complete. Time elapsed {} seconds", (endTime - startTime) / 1000);
+    return null;
+  }
+
+  /**
+   * Check known crawler through crawler agent name list
+   *
+   * @param agent name of a log line
+   * @return 1 if the log is initiated by crawler, 0 otherwise
+   */
+  public boolean checkKnownCrawler(String agent) {
+    agent = agent.toLowerCase();
+    if (agent.contains(CRAWLER) || agent.contains(GOOGLE_BOT) || agent.contains(BING_BOT) || agent.contains(APACHE_HHTP) || agent.contains(PERL_BOT) || agent.contains(YAHOO_BOT) || agent
+        .contains(YANDEX_BOT) || agent.contains(NO_AGENT_BOT) || agent.contains(PERL_BOT) || agent.contains(APACHE_HHTP) || agent.contains(JAVA_CLIENT) || agent.contains(CURL)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void checkByRate() throws InterruptedException, IOException {
+    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE);
+    if (processingType.equals("sequential")) {
+      checkByRateInSequential();
+    } else if (processingType.equals("parallel")) {
+      checkByRateInParallel();
+    }
+  }
+
+  /**
+   * Check crawler by request sending rate, which is read from configruation
+   * file
+   *
+   * @throws InterruptedException InterruptedException
+   * @throws IOException          IOException
+   */
+  public void checkByRateInSequential() throws InterruptedException, IOException {
+    es.createBulkProcessor();
+
+    int rate = Integer.parseInt(props.getProperty("sendingrate"));
+
+    Terms users = this.getUserTerms(this.httpType);
+    LOG.info("Original User count: {}", Integer.toString(users.getBuckets().size()));
+
+    int userCount = 0;
+    for (Terms.Bucket entry : users.getBuckets()) {
+      String user = entry.getKey().toString();
+      int count = checkByRate(es, user);
+      userCount += count;
+    }
+    es.destroyBulkProcessor();
+    LOG.info("User count: {}", Integer.toString(userCount));
+  }
+
+  void checkByRateInParallel() throws InterruptedException, IOException {
+
+    JavaRDD<String> userRDD = getUserRDD(this.httpType);
+    LOG.info("Original User count: {}", userRDD.count());
+
+    int userCount = 0;
+    userCount = userRDD.mapPartitions((FlatMapFunction<Iterator<String>, Integer>) iterator -> {
+      ESDriver tmpES = new ESDriver(props);
+      tmpES.createBulkProcessor();
+      List<Integer> realUserNums = new ArrayList<>();
+      while (iterator.hasNext()) {
+        String s = iterator.next();
+        Integer realUser = checkByRate(tmpES, s);
+        realUserNums.add(realUser);
+      }
+      tmpES.destroyBulkProcessor();
+      tmpES.close();
+      return realUserNums.iterator();
+    }).reduce((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
+
+    LOG.info("User count: {}", Integer.toString(userCount));
+  }
+
+  private int checkByRate(ESDriver es, String user) {
+
+    int rate = Integer.parseInt(props.getProperty("sendingrate"));
+    Pattern pattern = Pattern.compile("get (.*?) http/*");
+    Matcher matcher;
+
+    BoolQueryBuilder filterSearch = new BoolQueryBuilder();
+    filterSearch.must(QueryBuilders.termQuery("IP", user));
+
+    AggregationBuilder aggregation = AggregationBuilders.dateHistogram("by_minute").field("Time").dateHistogramInterval(DateHistogramInterval.MINUTE).order(Order.COUNT_DESC);
+    SearchResponse checkRobot = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setQuery(filterSearch).setSize(0).addAggregation(aggregation).execute().actionGet();
+
+    Histogram agg = checkRobot.getAggregations().get("by_minute");
+
+    List<? extends Histogram.Bucket> botList = agg.getBuckets();
+    long maxCount = botList.get(0).getDocCount();
+    if (maxCount >= rate) {
+      return 0;
+    } else {
+      DateTime dt1 = null;
+      int toLast = 0;
+      SearchResponse scrollResp = es.getClient().prepareSearch(logIndex).setTypes(httpType, ftpType).setScroll(new TimeValue(60000)).setQuery(filterSearch).setSize(100).execute().actionGet();
+      while (true) {
+        for (SearchHit hit : scrollResp.getHits().getHits()) {
+          Map<String, Object> result = hit.getSource();
+          String logtype = (String) result.get("LogType");
+          if (logtype.equals("PO.DAAC")) {
+            String request = (String) result.get("Request");
+            matcher = pattern.matcher(request.trim().toLowerCase());
+            boolean find = false;
+            while (matcher.find()) {
+              request = matcher.group(1);
+              result.put("RequestUrl", "http://podaac.jpl.nasa.gov" + request);
+              find = true;
+            }
+            if (!find) {
+              result.put("RequestUrl", request);
+            }
+          } else {
+            result.put("RequestUrl", result.get("Request"));
+          }
+
+          DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
+          DateTime dt2 = fmt.parseDateTime((String) result.get("Time"));
+
+          if (dt1 == null) {
+            toLast = 0;
+          } else {
+            toLast = Math.abs(Seconds.secondsBetween(dt1, dt2).getSeconds());
+          }
+          result.put("ToLast", toLast);
+          IndexRequest ir = new IndexRequest(logIndex, cleanupType).source(result);
+
+          es.getBulkProcessor().add(ir);
+          dt1 = dt2;
+        }
+
+        scrollResp = es.getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
+        if (scrollResp.getHits().getHits().length == 0) {
+          break;
+        }
+      }
+
+    }
+
+    return 1;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
new file mode 100644
index 0000000..f92d79c
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/HistoryGenerator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Supports ability to generate search history (queries) for each individual
+ * user (IP)
+ */
+public class HistoryGenerator extends LogAbstract {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(HistoryGenerator.class);
+
+  public HistoryGenerator(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Starting HistoryGenerator...");
+    startTime = System.currentTimeMillis();
+
+    generateBinaryMatrix();
+
+    endTime = System.currentTimeMillis();
+    LOG.info("HistoryGenerator complete. Time elapsed {} seconds", (endTime - startTime) / 1000);
+    return null;
+  }
+
+  /**
+   * Method to generate a binary user*query matrix (stored in temporary .csv
+   * file)
+   */
+  public void generateBinaryMatrix() {
+    try {
+
+      System.out.println(props.getProperty("userHistoryMatrix"));
+      File file = new File(props.getProperty("userHistoryMatrix"));
+      if (file.exists()) {
+        file.delete();
+      }
+
+      file.createNewFile();
+
+      FileWriter fw = new FileWriter(file.getAbsoluteFile());
+      BufferedWriter bw = new BufferedWriter(fw);
+      bw.write("Num" + ",");
+
+      // step 1: write first row of csv
+      List<String> logIndexList = es.getIndexListWithPrefix(props.getProperty(MudrodConstants.LOG_INDEX));
+
+      String[] logIndices = logIndexList.toArray(new String[0]);
+      String[] statictypeArray = new String[] { this.sessionStats };
+      int docCount = es.getDocCount(logIndices, statictypeArray);
+
+      SearchResponse sr = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
+          .addAggregation(AggregationBuilders.terms("IPs").field("IP").size(docCount)).execute().actionGet();
+      Terms ips = sr.getAggregations().get("IPs");
+      List<String> ipList = new ArrayList<>();
+      for (Terms.Bucket entry : ips.getBuckets()) {
+        if (entry.getDocCount() > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) { // filter
+          // out
+          // less
+          // active users/ips
+          ipList.add(entry.getKey().toString());
+        }
+      }
+      bw.write(String.join(",", ipList) + "\n");
+
+      // step 2: step the rest rows of csv
+      SearchRequestBuilder sr2Builder = es.getClient().prepareSearch(logIndices).setTypes(statictypeArray).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
+          .addAggregation(AggregationBuilders.terms("KeywordAgg").field("keywords").size(docCount).subAggregation(AggregationBuilders.terms("IPAgg").field("IP").size(docCount)));
+
+      SearchResponse sr2 = sr2Builder.execute().actionGet();
+      Terms keywords = sr2.getAggregations().get("KeywordAgg");
+
+      for (Terms.Bucket keyword : keywords.getBuckets()) {
+
+        Map<String, Integer> ipMap = new HashMap<>();
+        Terms ipAgg = keyword.getAggregations().get("IPAgg");
+
+        int distinctUser = ipAgg.getBuckets().size();
+        if (distinctUser > Integer.parseInt(props.getProperty(MudrodConstants.MINI_USER_HISTORY))) {
+          bw.write(keyword.getKey() + ",");
+          for (Terms.Bucket IP : ipAgg.getBuckets()) {
+
+            ipMap.put(IP.getKey().toString(), 1);
+          }
+          for (int i = 0; i < ipList.size(); i++) {
+            if (ipMap.containsKey(ipList.get(i))) {
+              bw.write(ipMap.get(ipList.get(i)) + ",");
+            } else {
+              bw.write("0,");
+            }
+          }
+          bw.write("\n");
+        }
+      }
+
+      bw.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java
new file mode 100644
index 0000000..ca47f01
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/ImportLogFile.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ * may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.weblog.structure.ApacheAccessLog;
+import org.apache.sdap.mudrod.weblog.structure.FtpLog;
+import org.apache.spark.api.java.JavaRDD;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * Supports ability to parse and process FTP and HTTP log files
+ */
+public class ImportLogFile extends LogAbstract {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ImportLogFile.class);
+
+  /**
+   *
+   */
+  private static final long serialVersionUID = 1L;
+
+  String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] " + "\"(.+?)\" (\\d{3}) (\\d+|-) \"((?:[^\"]|\")+)\" \"([^\"]+)\"";
+
+  public static final int NUM_FIELDS = 9;
+  Pattern p = Pattern.compile(logEntryPattern);
+  transient Matcher matcher;
+
+  /**
+   * Constructor supporting a number of parameters documented below.
+   *
+   * @param props a {@link java.util.Map} containing K,V of type String, String
+   *              respectively.
+   * @param es    the {@link ESDriver} used to persist log
+   *              files.
+   * @param spark the {@link SparkDriver} used to process
+   *              input log files.
+   */
+  public ImportLogFile(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+  }
+
+  @Override
+  public Object execute() {
+    LOG.info("Starting Log Import {}", props.getProperty(MudrodConstants.TIME_SUFFIX));
+    startTime = System.currentTimeMillis();
+    readFile();
+    endTime = System.currentTimeMillis();
+    LOG.info("Log Import complete. Time elapsed {} seconds", (endTime - startTime) / 1000);
+    es.refreshIndex();
+    return null;
+  }
+
+  /**
+   * Utility function to aid String to Number formatting such that three letter
+   * months such as 'Jan' are converted to the Gregorian integer equivalent.
+   *
+   * @param time the input {@link java.lang.String} to convert to int.
+   * @return the converted Month as an int.
+   */
+  public String switchtoNum(String time) {
+    String newTime = time;
+    if (newTime.contains("Jan")) {
+      newTime = newTime.replace("Jan", "1");
+    } else if (newTime.contains("Feb")) {
+      newTime = newTime.replace("Feb", "2");
+    } else if (newTime.contains("Mar")) {
+      newTime = newTime.replace("Mar", "3");
+    } else if (newTime.contains("Apr")) {
+      newTime = newTime.replace("Apr", "4");
+    } else if (newTime.contains("May")) {
+      newTime = newTime.replace("May", "5");
+    } else if (newTime.contains("Jun")) {
+      newTime = newTime.replace("Jun", "6");
+    } else if (newTime.contains("Jul")) {
+      newTime = newTime.replace("Jul", "7");
+    } else if (newTime.contains("Aug")) {
+      newTime = newTime.replace("Aug", "8");
+    } else if (newTime.contains("Sep")) {
+      newTime = newTime.replace("Sep", "9");
+    } else if (newTime.contains("Oct")) {
+      newTime = newTime.replace("Oct", "10");
+    } else if (newTime.contains("Nov")) {
+      newTime = newTime.replace("Nov", "11");
+    } else if (newTime.contains("Dec")) {
+      newTime = newTime.replace("Dec", "12");
+    }
+    return newTime;
+  }
+
+  public void readFile() {
+
+    String httplogpath = null;
+    String ftplogpath = null;
+    
+    File directory = new File(props.getProperty(MudrodConstants.DATA_DIR));
+    File[] fList = directory.listFiles();
+    for (File file : fList) {
+      if (file.isFile() && file.getName().contains(props.getProperty(MudrodConstants.TIME_SUFFIX))) 
+      {
+        if (file.getName().contains(props.getProperty(MudrodConstants.HTTP_PREFIX))) 
+        {
+          httplogpath = file.getAbsolutePath();
+        }
+        
+        if (file.getName().contains(props.getProperty(MudrodConstants.FTP_PREFIX))) 
+        {
+          ftplogpath = file.getAbsolutePath();
+        }
+      }
+    }
+    
+    if(httplogpath == null || ftplogpath == null)
+    {
+      LOG.error("WWW file or FTP logs cannot be found, please check your data directory.");
+      return;
+    }
+
+    String processingType = props.getProperty(MudrodConstants.PROCESS_TYPE, "parallel");
+    if (processingType.equals("sequential")) {
+      readFileInSequential(httplogpath, ftplogpath);
+    } else if (processingType.equals("parallel")) {
+      readFileInParallel(httplogpath, ftplogpath);
+    }
+  }
+
+  /**
+   * Read the FTP or HTTP log path with the intention of processing lines from
+   * log files.
+   *
+   * @param httplogpath path to the parent directory containing http logs
+   * @param ftplogpath  path to the parent directory containing ftp logs
+   */
+  public void readFileInSequential(String httplogpath, String ftplogpath) {
+    es.createBulkProcessor();
+    try {
+      readLogFile(httplogpath, "http", logIndex, httpType);
+      readLogFile(ftplogpath, "FTP", logIndex, ftpType);
+
+    } catch (IOException e) {
+      LOG.error("Error whilst reading log file.", e);
+    }
+    es.destroyBulkProcessor();
+  }
+
+  /**
+   * Read the FTP or HTTP log path with the intention of processing lines from
+   * log files.
+   *
+   * @param httplogpath path to the parent directory containing http logs
+   * @param ftplogpath  path to the parent directory containing ftp logs
+   */
+  public void readFileInParallel(String httplogpath, String ftplogpath) {
+
+    importHttpfile(httplogpath);
+    importFtpfile(ftplogpath);
+  }
+
+  public void importHttpfile(String httplogpath) {
+    // import http logs
+    JavaRDD<String> accessLogs = spark.sc.textFile(httplogpath, this.partition).map(s -> ApacheAccessLog.parseFromLogLine(s)).filter(ApacheAccessLog::checknull);
+
+    JavaEsSpark.saveJsonToEs(accessLogs, logIndex + "/" + this.httpType);
+  }
+
+  public void importFtpfile(String ftplogpath) {
+    // import ftp logs
+    JavaRDD<String> ftpLogs = spark.sc.textFile(ftplogpath, this.partition).map(s -> FtpLog.parseFromLogLine(s)).filter(FtpLog::checknull);
+
+    JavaEsSpark.saveJsonToEs(ftpLogs, logIndex + "/" + this.ftpType);
+  }
+
+  /**
+   * Process a log path on local file system which contains the relevant
+   * parameters as below.
+   *
+   * @param fileName the {@link java.lang.String} path to the log directory on file
+   *                 system
+   * @param protocol whether to process 'http' or 'FTP'
+   * @param index    the index name to write logs to
+   * @param type     one of the available protocols from which Mudrod logs are obtained.
+   * @throws IOException if there is an error reading anything from the fileName provided.
+   */
+  public void readLogFile(String fileName, String protocol, String index, String type) throws IOException {
+    BufferedReader br = new BufferedReader(new FileReader(fileName));
+    int count = 0;
+    try {
+      String line = br.readLine();
+      while (line != null) {
+        if ("FTP".equals(protocol)) {
+          parseSingleLineFTP(line, index, type);
+        } else {
+          parseSingleLineHTTP(line, index, type);
+        }
+        line = br.readLine();
+        count++;
+      }
+    } catch (FileNotFoundException e) {
+      LOG.error("File not found.", e);
+    } catch (IOException e) {
+      LOG.error("Error reading input directory.", e);
+    } finally {
+      br.close();
+      LOG.info("Num of {} entries:\t{}", protocol, count);
+    }
+  }
+
+  /**
+   * Parse a single FTP log entry
+   *
+   * @param log   a single log line
+   * @param index the index name we wish to persist the log line to
+   * @param type  one of the available protocols from which Mudrod logs are obtained.
+   */
+  public void parseSingleLineFTP(String log, String index, String type) {
+    String ip = log.split(" +")[6];
+
+    String time = log.split(" +")[1] + ":" + log.split(" +")[2] + ":" + log.split(" +")[3] + ":" + log.split(" +")[4];
+
+    time = switchtoNum(time);
+    SimpleDateFormat formatter = new SimpleDateFormat("MM:dd:HH:mm:ss:yyyy");
+    Date date = null;
+    try {
+      date = formatter.parse(time);
+    } catch (ParseException e) {
+      LOG.error("Error whilst parsing the date.", e);
+    }
+    String bytes = log.split(" +")[7];
+
+    String request = log.split(" +")[8].toLowerCase();
+
+    if (!request.contains("/misc/") && !request.contains("readme")) {
+      IndexRequest ir;
+      try {
+        ir = new IndexRequest(index, type)
+            .source(jsonBuilder().startObject().field("LogType", "ftp").field("IP", ip).field("Time", date).field("Request", request).field("Bytes", Long.parseLong(bytes)).endObject());
+        es.getBulkProcessor().add(ir);
+      } catch (NumberFormatException e) {
+        LOG.error("Error whilst processing numbers", e);
+      } catch (IOException e) {
+        LOG.error("IOError whilst adding to the bulk processor.", e);
+      }
+    }
+
+  }
+
+  /**
+   * Parse a single HTTP log entry
+   *
+   * @param log   a single log line
+   * @param index the index name we wish to persist the log line to
+   * @param type  one of the available protocols from which Mudrod logs are obtained.
+   */
+  public void parseSingleLineHTTP(String log, String index, String type) {
+    matcher = p.matcher(log);
+    if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) {
+      return;
+    }
+    String time = matcher.group(4);
+    time = switchtoNum(time);
+    SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
+    Date date = null;
+    try {
+      date = formatter.parse(time);
+    } catch (ParseException e) {
+      LOG.error("Error whilst attempting to parse date.", e);
+    }
+
+    String bytes = matcher.group(7);
+    if ("-".equals(bytes)) {
+      bytes = "0";
+    }
+
+    String request = matcher.group(5).toLowerCase();
+    String agent = matcher.group(9);
+    CrawlerDetection crawlerDe = new CrawlerDetection(this.props, this.es, this.spark);
+    if (!crawlerDe.checkKnownCrawler(agent)) {
+      boolean tag = false;
+      String[] mimeTypes = { ".js", ".css", ".jpg", ".png", ".ico", "image_captcha", "autocomplete", ".gif", "/alldata/", "/api/", "get / http/1.1", ".jpeg", "/ws/" };
+      for (int i = 0; i < mimeTypes.length; i++) {
+        if (request.contains(mimeTypes[i])) {
+          tag = true;
+          break;
+        }
+      }
+
+      if (!tag) {
+        IndexRequest ir = null;
+        executeBulkRequest(ir, index, type, matcher, date, bytes);
+      }
+    }
+  }
+
+  private void executeBulkRequest(IndexRequest ir, String index, String type, Matcher matcher, Date date, String bytes) {
+    IndexRequest newIr = ir;
+    try {
+      newIr = new IndexRequest(index, type).source(
+          jsonBuilder().startObject().field("LogType", "PO.DAAC").field("IP", matcher.group(1)).field("Time", date).field("Request", matcher.group(5)).field("Response", matcher.group(6))
+              .field("Bytes", Integer.parseInt(bytes)).field("Referer", matcher.group(8)).field("Browser", matcher.group(9)).endObject());
+
+      es.getBulkProcessor().add(newIr);
+    } catch (NumberFormatException e) {
+      LOG.error("Error whilst processing numbers", e);
+    } catch (IOException e) {
+      LOG.error("IOError whilst adding to the bulk processor.", e);
+    }
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
new file mode 100644
index 0000000..23ddbee
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/LogAbstract.java
@@ -0,0 +1,228 @@
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.main.MudrodConstants;
+import org.apache.sdap.mudrod.weblog.partition.KGreedyPartitionSolver;
+import org.apache.sdap.mudrod.weblog.partition.ThePartitionProblemSolver;
+import org.apache.sdap.mudrod.weblog.partition.logPartitioner;
+import org.apache.spark.Partition;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
+public class LogAbstract extends DiscoveryStepAbstract {
+
+  /**
+   * 
+   */
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(LogAbstract.class);
+
+  public String logIndex = null;
+  public String httpType = null;
+  public String ftpType = null;
+  public String cleanupType = null;
+  public String sessionStats = null;
+  public int partition = 96;
+
+  public LogAbstract(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+    if (props != null) {
+      initLogIndex();
+    }
+  }
+
+  protected void initLogIndex() {
+    logIndex = props.getProperty(MudrodConstants.LOG_INDEX) + props.getProperty(MudrodConstants.TIME_SUFFIX);
+    httpType = props.getProperty(MudrodConstants.HTTP_TYPE_PREFIX);
+    ftpType = props.getProperty(MudrodConstants.FTP_TYPE_PREFIX);
+    cleanupType = props.getProperty(MudrodConstants.CLEANUP_TYPE_PREFIX);
+    sessionStats = props.getProperty(MudrodConstants.SESSION_STATS_PREFIX);
+
+    InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS);
+    InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS);
+    JSONObject settingsJSON = null;
+    JSONObject mappingJSON = null;
+
+    try {
+      settingsJSON = new JSONObject(IOUtils.toString(settingsStream));
+    } catch (JSONException | IOException e1) {
+      LOG.error("Error reading Elasticsearch settings!", e1);
+    }
+
+    try {
+      mappingJSON = new JSONObject(IOUtils.toString(mappingsStream));
+    } catch (JSONException | IOException e1) {
+      LOG.error("Error reading Elasticsearch mappings!", e1);
+    }
+
+    try {
+      if (settingsJSON != null && mappingJSON != null) {
+        this.es.putMapping(logIndex, settingsJSON.toString(), mappingJSON.toString());
+      }
+    } catch (IOException e) {
+      LOG.error("Error entering Elasticsearch Mappings!", e);
+    }
+  }
+
+  @Override
+  public Object execute() {
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    return null;
+  }
+
+  public JavaRDD<String> getUserRDD(String... type) {
+    Map<String, Double> userDocs = getUserDocs(type);
+    return parallizeUsers(userDocs);
+  }
+
+  public List<String> getUsers(String type) {
+
+    Terms users = this.getUserTerms(type);
+    List<String> userList = new ArrayList<>();
+    for (Terms.Bucket entry : users.getBuckets()) {
+      String ip = (String) entry.getKey();
+      userList.add(ip);
+    }
+
+    return userList;
+  }
+
+  public Terms getUserTerms(String... type) {
+
+    int docCount = es.getDocCount(logIndex, type);
+
+    SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(type).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
+        .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount)).execute().actionGet();
+    return sr.getAggregations().get("Users");
+  }
+
+  public Map<String, Double> getUserDocs(String... type) {
+
+    Terms users = this.getUserTerms(type);
+    Map<String, Double> userList = new HashMap<>();
+    for (Terms.Bucket entry : users.getBuckets()) {
+      String ip = (String) entry.getKey();
+      Long count = entry.getDocCount();
+      userList.put(ip, Double.valueOf(count));
+    }
+
+    return userList;
+  }
+
+  public Map<String, Long> getUserDailyDocs() {
+
+    int docCount = es.getDocCount(logIndex, httpType);
+
+    AggregationBuilder dailyAgg = AggregationBuilders.dateHistogram("by_day").field("Time").dateHistogramInterval(DateHistogramInterval.DAY).order(Order.COUNT_DESC);
+
+    SearchResponse sr = es.getClient().prepareSearch(logIndex).setTypes(httpType).setQuery(QueryBuilders.matchAllQuery()).setSize(0)
+        .addAggregation(AggregationBuilders.terms("Users").field("IP").size(docCount).subAggregation(dailyAgg)).execute().actionGet();
+    Terms users = sr.getAggregations().get("Users");
+    Map<String, Long> userList = new HashMap<>();
+    for (Terms.Bucket user : users.getBuckets()) {
+      String ip = (String) user.getKey();
+
+      System.out.println(ip);
+
+      Histogram agg = user.getAggregations().get("by_day");
+      List<? extends Histogram.Bucket> dateList = agg.getBuckets();
+      int size = dateList.size();
+      for (int i = 0; i < size; i++) {
+        Long count = dateList.get(i).getDocCount();
+        String date = dateList.get(i).getKey().toString();
+
+        System.out.println(date);
+        System.out.println(count);
+      }
+    }
+
+    return userList;
+  }
+
+  protected void checkUserPartition(JavaRDD<String> userRDD) {
+    System.out.println("hhhhh");
+    List<Partition> partitios = userRDD.partitions();
+    System.out.println(partitios.size());
+    int[] partitionIds = new int[partitios.size()];
+    for (int i = 0; i < partitios.size(); i++) {
+      int index = partitios.get(i).index();
+      partitionIds[i] = index;
+    }
+
+    List<String>[] userIPs = userRDD.collectPartitions(partitionIds);
+    for (int i = 0; i < userIPs.length; i++) {
+      List<String> iuser = userIPs[i];
+      System.out.println(i + " partition");
+      System.out.println(iuser.toString());
+    }
+  }
+
+  public JavaRDD<String> parallizeUsers(Map<String, Double> userDocs) {
+
+    // prepare list for parallize
+    List<Tuple2<String, Double>> list = new ArrayList<>();
+    for (String user : userDocs.keySet()) {
+      list.add(new Tuple2<String, Double>(user, userDocs.get(user)));
+    }
+
+    // group users
+    ThePartitionProblemSolver solution = new KGreedyPartitionSolver();
+    Map<String, Integer> userGroups = solution.solve(userDocs, this.partition);
+
+    JavaPairRDD<String, Double> pairRdd = spark.sc.parallelizePairs(list);
+    JavaPairRDD<String, Double> userPairRDD = pairRdd.partitionBy(new logPartitioner(userGroups, this.partition));
+
+    // repartitioned user RDD
+    return userPairRDD.keys();
+  }
+
+  public Terms getSessionTerms() {
+
+    int docCount = es.getDocCount(this.logIndex, this.cleanupType);
+
+    SearchResponse sr = es.getClient().prepareSearch(this.logIndex).setTypes(this.cleanupType).setQuery(QueryBuilders.matchAllQuery())
+        .addAggregation(AggregationBuilders.terms("Sessions").field("SessionID").size(docCount)).execute().actionGet();
+
+    Terms Sessions = sr.getAggregations().get("Sessions");
+    return Sessions;
+  }
+
+  public List<String> getSessions() {
+
+    Terms sessions = this.getSessionTerms();
+    List<String> sessionList = new ArrayList<>();
+    for (Terms.Bucket entry : sessions.getBuckets()) {
+      if (entry.getDocCount() >= 3 && !entry.getKey().equals("invalid")) {
+        String session = (String) entry.getKey();
+        sessionList.add(session);
+      }
+    }
+
+    return sessionList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-mudrod/blob/39379fa9/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RankingTrainDataGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RankingTrainDataGenerator.java b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RankingTrainDataGenerator.java
new file mode 100644
index 0000000..766e853
--- /dev/null
+++ b/core/src/main/java/org/apache/sdap/mudrod/weblog/pre/RankingTrainDataGenerator.java
@@ -0,0 +1,54 @@
+package org.apache.sdap.mudrod.weblog.pre;
+
+import org.apache.sdap.mudrod.discoveryengine.DiscoveryStepAbstract;
+import org.apache.sdap.mudrod.driver.ESDriver;
+import org.apache.sdap.mudrod.driver.SparkDriver;
+import org.apache.sdap.mudrod.weblog.structure.RankingTrainData;
+import org.apache.sdap.mudrod.weblog.structure.SessionExtractor;
+import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class RankingTrainDataGenerator extends DiscoveryStepAbstract {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(RankingTrainDataGenerator.class);
+
+  public RankingTrainDataGenerator(Properties props, ESDriver es, SparkDriver spark) {
+    super(props, es, spark);
+    // TODO Auto-generated constructor stub
+  }
+
+  @Override
+  public Object execute() {
+    // TODO Auto-generated method stub
+    LOG.info("Starting generate ranking train data.");
+    startTime = System.currentTimeMillis();
+
+    String rankingTrainFile = "E:\\Mudrod_input_data\\Testing_Data_4_1monthLog+Meta+Onto\\traing.txt";
+    try {
+      SessionExtractor extractor = new SessionExtractor();
+      JavaRDD<RankingTrainData> rankingTrainDataRDD = extractor.extractRankingTrainData(this.props, this.es, this.spark);
+
+      JavaRDD<String> rankingTrainData_JsonRDD = rankingTrainDataRDD.map(f -> f.toJson());
+
+      rankingTrainData_JsonRDD.coalesce(1, true).saveAsTextFile(rankingTrainFile);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    endTime = System.currentTimeMillis();
+    LOG.info("Ranking train data generation complete. Time elapsed {} seconds.", (endTime - startTime) / 1000);
+    return null;
+  }
+
+  @Override
+  public Object execute(Object o) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}