You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by am...@apache.org on 2015/04/15 21:50:02 UTC

[28/50] [abbrv] incubator-lens git commit: Lens-465 : Refactor ml packages. (sharad)

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MLRunner.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLRunner.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MLRunner.java
deleted file mode 100644
index bd50cba..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLRunner.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.lens.client.LensClient;
-import org.apache.lens.client.LensClientConfig;
-import org.apache.lens.client.LensMLClient;
-import org.apache.lens.ml.task.MLTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-public class MLRunner {
-
-  private static final Log LOG = LogFactory.getLog(MLRunner.class);
-
-  private LensMLClient mlClient;
-  private String algoName;
-  private String database;
-  private String trainTable;
-  private String trainFile;
-  private String testTable;
-  private String testFile;
-  private String outputTable;
-  private String[] features;
-  private String labelColumn;
-  private HiveConf conf;
-
-  public void init(LensMLClient mlClient, String confDir) throws Exception {
-    File dir = new File(confDir);
-    File propFile = new File(dir, "ml.properties");
-    Properties props = new Properties();
-    props.load(new FileInputStream(propFile));
-    String feat = props.getProperty("features");
-    String trainFile = confDir + File.separator + "train.data";
-    String testFile = confDir + File.separator + "test.data";
-    init(mlClient, props.getProperty("algo"), props.getProperty("database"),
-        props.getProperty("traintable"), trainFile,
-        props.getProperty("testtable"), testFile,
-        props.getProperty("outputtable"), feat.split(","),
-        props.getProperty("labelcolumn"));
-  }
-
-  public void init(LensMLClient mlClient, String algoName,
-      String database, String trainTable, String trainFile,
-      String testTable, String testFile, String outputTable, String[] features,
-      String labelColumn) {
-    this.mlClient = mlClient;
-    this.algoName = algoName;
-    this.database = database;
-    this.trainTable = trainTable;
-    this.trainFile = trainFile;
-    this.testTable = testTable;
-    this.testFile = testFile;
-    this.outputTable = outputTable;
-    this.features = features;
-    this.labelColumn = labelColumn;
-    //hive metastore settings are loaded via lens-site.xml, so loading LensClientConfig
-    //is required
-    this.conf = new HiveConf(new LensClientConfig(), MLRunner.class);
-  }
-
-  public MLTask train() throws Exception {
-    LOG.info("Starting train & eval");
-
-    createTable(trainTable, trainFile);
-    createTable(testTable, testFile);
-    MLTask.Builder taskBuilder = new MLTask.Builder();
-    taskBuilder.algorithm(algoName).hiveConf(conf).labelColumn(labelColumn).outputTable(outputTable)
-        .client(mlClient).trainingTable(trainTable).testTable(testTable);
-
-    // Add features
-    for (String feature : features) {
-      taskBuilder.addFeatureColumn(feature);
-    }
-    MLTask task = taskBuilder.build();
-    LOG.info("Created task " + task.toString());
-    task.run();
-    return task;
-  }
-
-  public void createTable(String tableName, String dataFile) throws HiveException {
-
-    File filedataFile = new File(dataFile);
-    Path dataFilePath = new Path(filedataFile.toURI());
-    Path partDir = dataFilePath.getParent();
-
-    // Create table
-    List<FieldSchema> columns = new ArrayList<FieldSchema>();
-
-    // Label is optional. Not used for unsupervised models.
-    // If present, label will be the first column, followed by features
-    if (labelColumn != null) {
-      columns.add(new FieldSchema(labelColumn, "double", "Labelled Column"));
-    }
-
-    for (String feature : features) {
-      columns.add(new FieldSchema(feature, "double", "Feature " + feature));
-    }
-
-    Table tbl = Hive.get(conf).newTable(database + "." + tableName);
-    tbl.setTableType(TableType.MANAGED_TABLE);
-    tbl.getTTable().getSd().setCols(columns);
-    // tbl.getTTable().getParameters().putAll(new HashMap<String, String>());
-    tbl.setInputFormatClass(TextInputFormat.class);
-    tbl.setSerdeParam(serdeConstants.LINE_DELIM, "\n");
-    tbl.setSerdeParam(serdeConstants.FIELD_DELIM, " ");
-
-    List<FieldSchema> partCols = new ArrayList<FieldSchema>(1);
-    partCols.add(new FieldSchema("dummy_partition_col", "string", ""));
-    tbl.setPartCols(partCols);
-
-    Hive.get(conf).dropTable(database, tableName, false, true);
-    Hive.get(conf).createTable(tbl, true);
-    LOG.info("Created table " + tableName);
-
-    // Add partition for the data file
-    AddPartitionDesc partitionDesc = new AddPartitionDesc(database, tableName,
-        false);
-    Map<String, String> partSpec = new HashMap<String, String>();
-    partSpec.put("dummy_partition_col", "dummy_val");
-    partitionDesc.addPartition(partSpec, partDir.toUri().toString());
-    Hive.get(conf).createPartitions(partitionDesc);
-    LOG.info(tableName + ": Added partition " + partDir.toUri().toString());
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length < 1) {
-      System.out.println("Usage: org.apache.lens.ml.MLRunner <ml-conf-dir>");
-      System.exit(-1);
-    }
-    String confDir = args[0];
-    LensMLClient client = new LensMLClient(new LensClient());
-    MLRunner runner = new MLRunner();
-    runner.init(client, confDir);
-    runner.train();
-    System.out.println("Created the Model successfully. Output Table: " + runner.outputTable);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestMetric.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestMetric.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestMetric.java
deleted file mode 100644
index 57adecc..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestMetric.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-/**
- * The Interface MLTestMetric.
- */
-public interface MLTestMetric {
-  String getName();
-
-  String getDescription();
-}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestReport.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestReport.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestReport.java
deleted file mode 100644
index 909e6df..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestReport.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.io.Serializable;
-import java.util.List;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
-
-/**
- * Instantiates a new ML test report.
- */
-@NoArgsConstructor
-@ToString
-public class MLTestReport implements Serializable {
-
-  /** The test table. */
-  @Getter
-  @Setter
-  private String testTable;
-
-  /** The output table. */
-  @Getter
-  @Setter
-  private String outputTable;
-
-  /** The output column. */
-  @Getter
-  @Setter
-  private String outputColumn;
-
-  /** The label column. */
-  @Getter
-  @Setter
-  private String labelColumn;
-
-  /** The feature columns. */
-  @Getter
-  @Setter
-  private List<String> featureColumns;
-
-  /** The algorithm. */
-  @Getter
-  @Setter
-  private String algorithm;
-
-  /** The model id. */
-  @Getter
-  @Setter
-  private String modelID;
-
-  /** The report id. */
-  @Getter
-  @Setter
-  private String reportID;
-
-  /** The query id. */
-  @Getter
-  @Setter
-  private String queryID;
-
-  /** The test output path. */
-  @Getter
-  @Setter
-  private String testOutputPath;
-
-  /** The prediction result column. */
-  @Getter
-  @Setter
-  private String predictionResultColumn;
-
-  /** The lens query id. */
-  @Getter
-  @Setter
-  private String lensQueryID;
-}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MLUtils.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLUtils.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MLUtils.java
deleted file mode 100644
index 2e240af..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import org.apache.lens.server.api.LensConfConstants;
-import org.apache.lens.server.api.ServiceProvider;
-import org.apache.lens.server.api.ServiceProviderFactory;
-import org.apache.lens.server.ml.MLService;
-import org.apache.lens.server.ml.MLServiceImpl;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-
-public final class MLUtils {
-  private MLUtils() {
-  }
-
-  private static final HiveConf HIVE_CONF;
-
-  static {
-    HIVE_CONF = new HiveConf();
-    // Add default config so that we know the service provider implementation
-    HIVE_CONF.addResource("lensserver-default.xml");
-    HIVE_CONF.addResource("lens-site.xml");
-  }
-
-  public static String getAlgoName(Class<? extends MLAlgo> algoClass) {
-    Algorithm annotation = algoClass.getAnnotation(Algorithm.class);
-    if (annotation != null) {
-      return annotation.name();
-    }
-    throw new IllegalArgumentException("Algo should be decorated with annotation - " + Algorithm.class.getName());
-  }
-
-  public static MLServiceImpl getMLService() throws Exception {
-    return getServiceProvider().getService(MLService.NAME);
-  }
-
-  public static ServiceProvider getServiceProvider() throws Exception {
-    Class<? extends ServiceProviderFactory> spfClass = HIVE_CONF.getClass(LensConfConstants.SERVICE_PROVIDER_FACTORY,
-      null, ServiceProviderFactory.class);
-    ServiceProviderFactory spf = spfClass.newInstance();
-    return spf.getServiceProvider();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
deleted file mode 100644
index 429cbf9..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
-/**
- * Load ML models from a FS location.
- */
-public final class ModelLoader {
-  private ModelLoader() {
-  }
-
-  /** The Constant MODEL_PATH_BASE_DIR. */
-  public static final String MODEL_PATH_BASE_DIR = "lens.ml.model.basedir";
-
-  /** The Constant MODEL_PATH_BASE_DIR_DEFAULT. */
-  public static final String MODEL_PATH_BASE_DIR_DEFAULT = "file:///tmp";
-
-  /** The Constant LOG. */
-  public static final Log LOG = LogFactory.getLog(ModelLoader.class);
-
-  /** The Constant TEST_REPORT_BASE_DIR. */
-  public static final String TEST_REPORT_BASE_DIR = "lens.ml.test.basedir";
-
-  /** The Constant TEST_REPORT_BASE_DIR_DEFAULT. */
-  public static final String TEST_REPORT_BASE_DIR_DEFAULT = "file:///tmp/ml_reports";
-
-  // Model cache settings
-  /** The Constant MODEL_CACHE_SIZE. */
-  public static final long MODEL_CACHE_SIZE = 10;
-
-  /** The Constant MODEL_CACHE_TIMEOUT. */
-  public static final long MODEL_CACHE_TIMEOUT = 3600000L; // one hour
-
-  /** The model cache. */
-  private static Cache<Path, MLModel> modelCache = CacheBuilder.newBuilder().maximumSize(MODEL_CACHE_SIZE)
-    .expireAfterAccess(MODEL_CACHE_TIMEOUT, TimeUnit.MILLISECONDS).build();
-
-  /**
-   * Gets the model location.
-   *
-   * @param conf      the conf
-   * @param algorithm the algorithm
-   * @param modelID   the model id
-   * @return the model location
-   */
-  public static Path getModelLocation(Configuration conf, String algorithm, String modelID) {
-    String modelDataBaseDir = conf.get(MODEL_PATH_BASE_DIR, MODEL_PATH_BASE_DIR_DEFAULT);
-    // Model location format - <modelDataBaseDir>/<algorithm>/modelID
-    return new Path(new Path(new Path(modelDataBaseDir), algorithm), modelID);
-  }
-
-  /**
-   * Load model.
-   *
-   * @param conf      the conf
-   * @param algorithm the algorithm
-   * @param modelID   the model id
-   * @return the ML model
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  public static MLModel loadModel(Configuration conf, String algorithm, String modelID) throws IOException {
-    final Path modelPath = getModelLocation(conf, algorithm, modelID);
-    LOG.info("Loading model for algorithm: " + algorithm + " modelID: " + modelID + " At path: "
-      + modelPath.toUri().toString());
-    try {
-      return modelCache.get(modelPath, new Callable<MLModel>() {
-        @Override
-        public MLModel call() throws Exception {
-          FileSystem fs = modelPath.getFileSystem(new HiveConf());
-          if (!fs.exists(modelPath)) {
-            throw new IOException("Model path not found " + modelPath.toString());
-          }
-
-          ObjectInputStream ois = null;
-          try {
-            ois = new ObjectInputStream(fs.open(modelPath));
-            MLModel model = (MLModel) ois.readObject();
-            LOG.info("Loaded model " + model.getId() + " from location " + modelPath);
-            return model;
-          } catch (ClassNotFoundException e) {
-            throw new IOException(e);
-          } finally {
-            IOUtils.closeQuietly(ois);
-          }
-        }
-      });
-    } catch (ExecutionException exc) {
-      throw new IOException(exc);
-    }
-  }
-
-  /**
-   * Clear cache.
-   */
-  public static void clearCache() {
-    modelCache.cleanUp();
-  }
-
-  /**
-   * Gets the test report path.
-   *
-   * @param conf      the conf
-   * @param algorithm the algorithm
-   * @param report    the report
-   * @return the test report path
-   */
-  public static Path getTestReportPath(Configuration conf, String algorithm, String report) {
-    String testReportDir = conf.get(TEST_REPORT_BASE_DIR, TEST_REPORT_BASE_DIR_DEFAULT);
-    return new Path(new Path(testReportDir, algorithm), report);
-  }
-
-  /**
-   * Save test report.
-   *
-   * @param conf   the conf
-   * @param report the report
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  public static void saveTestReport(Configuration conf, MLTestReport report) throws IOException {
-    Path reportDir = new Path(conf.get(TEST_REPORT_BASE_DIR, TEST_REPORT_BASE_DIR_DEFAULT));
-    FileSystem fs = reportDir.getFileSystem(conf);
-
-    if (!fs.exists(reportDir)) {
-      LOG.info("Creating test report dir " + reportDir.toUri().toString());
-      fs.mkdirs(reportDir);
-    }
-
-    Path algoDir = new Path(reportDir, report.getAlgorithm());
-
-    if (!fs.exists(algoDir)) {
-      LOG.info("Creating algorithm report dir " + algoDir.toUri().toString());
-      fs.mkdirs(algoDir);
-    }
-
-    ObjectOutputStream reportOutputStream = null;
-    Path reportSaveLocation;
-    try {
-      reportSaveLocation = new Path(algoDir, report.getReportID());
-      reportOutputStream = new ObjectOutputStream(fs.create(reportSaveLocation));
-      reportOutputStream.writeObject(report);
-      reportOutputStream.flush();
-    } catch (IOException ioexc) {
-      LOG.error("Error saving test report " + report.getReportID(), ioexc);
-      throw ioexc;
-    } finally {
-      IOUtils.closeQuietly(reportOutputStream);
-    }
-    LOG.info("Saved report " + report.getReportID() + " at location " + reportSaveLocation.toUri());
-  }
-
-  /**
-   * Load report.
-   *
-   * @param conf      the conf
-   * @param algorithm the algorithm
-   * @param reportID  the report id
-   * @return the ML test report
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  public static MLTestReport loadReport(Configuration conf, String algorithm, String reportID) throws IOException {
-    Path reportLocation = getTestReportPath(conf, algorithm, reportID);
-    FileSystem fs = reportLocation.getFileSystem(conf);
-    ObjectInputStream reportStream = null;
-    MLTestReport report = null;
-
-    try {
-      reportStream = new ObjectInputStream(fs.open(reportLocation));
-      report = (MLTestReport) reportStream.readObject();
-    } catch (IOException ioex) {
-      LOG.error("Error reading report " + reportLocation, ioex);
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    } finally {
-      IOUtils.closeQuietly(reportStream);
-    }
-    return report;
-  }
-
-  /**
-   * Delete model.
-   *
-   * @param conf      the conf
-   * @param algorithm the algorithm
-   * @param modelID   the model id
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  public static void deleteModel(HiveConf conf, String algorithm, String modelID) throws IOException {
-    Path modelLocation = getModelLocation(conf, algorithm, modelID);
-    FileSystem fs = modelLocation.getFileSystem(conf);
-    fs.delete(modelLocation, false);
-  }
-
-  /**
-   * Delete test report.
-   *
-   * @param conf      the conf
-   * @param algorithm the algorithm
-   * @param reportID  the report id
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  public static void deleteTestReport(HiveConf conf, String algorithm, String reportID) throws IOException {
-    Path reportPath = getTestReportPath(conf, algorithm, reportID);
-    reportPath.getFileSystem(conf).delete(reportPath, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MultiPrediction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MultiPrediction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MultiPrediction.java
deleted file mode 100644
index 4794c97..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MultiPrediction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.util.List;
-
-/**
- * The Interface MultiPrediction.
- */
-public interface MultiPrediction {
-  List<LabelledPrediction> getPredictions();
-}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/QueryRunner.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/QueryRunner.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/QueryRunner.java
deleted file mode 100644
index 56f9a88..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/QueryRunner.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import org.apache.lens.api.LensException;
-import org.apache.lens.api.LensSessionHandle;
-import org.apache.lens.api.query.QueryHandle;
-
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- * Run a query against a Lens server.
- */
-public abstract class QueryRunner {
-
-  /** The session handle. */
-  protected final LensSessionHandle sessionHandle;
-
-  @Getter @Setter
-  protected String queryName;
-
-  /**
-   * Instantiates a new query runner.
-   *
-   * @param sessionHandle the session handle
-   */
-  public QueryRunner(LensSessionHandle sessionHandle) {
-    this.sessionHandle = sessionHandle;
-  }
-
-  /**
-   * Run query.
-   *
-   * @param query the query
-   * @return the query handle
-   * @throws LensException the lens exception
-   */
-  public abstract QueryHandle runQuery(String query) throws LensException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/TableTestingSpec.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/TableTestingSpec.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/TableTestingSpec.java
deleted file mode 100644
index f7fb1f8..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/TableTestingSpec.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-
-import lombok.Getter;
-
-/**
- * Table specification for running test on a table.
- */
-public class TableTestingSpec {
-
-  /** The Constant LOG. */
-  public static final Log LOG = LogFactory.getLog(TableTestingSpec.class);
-
-  /** The db. */
-  private String db;
-
-  /** The table containing input data. */
-  private String inputTable;
-
-  // TODO use partition condition
-  /** The partition filter. */
-  private String partitionFilter;
-
-  /** The feature columns. */
-  private List<String> featureColumns;
-
-  /** The label column. */
-  private String labelColumn;
-
-  /** The output column. */
-  private String outputColumn;
-
-  /** The output table. */
-  private String outputTable;
-
-  /** The conf. */
-  private transient HiveConf conf;
-
-  /** The algorithm. */
-  private String algorithm;
-
-  /** The model id. */
-  private String modelID;
-
-  @Getter
-  private boolean outputTableExists;
-
-  @Getter
-  private String testID;
-
-  private HashMap<String, FieldSchema> columnNameToFieldSchema;
-
-  /**
-   * The Class TableTestingSpecBuilder.
-   */
-  public static class TableTestingSpecBuilder {
-
-    /** The spec. */
-    private final TableTestingSpec spec;
-
-    /**
-     * Instantiates a new table testing spec builder.
-     */
-    public TableTestingSpecBuilder() {
-      spec = new TableTestingSpec();
-    }
-
-    /**
-     * Database.
-     *
-     * @param database the database
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder database(String database) {
-      spec.db = database;
-      return this;
-    }
-
-    /**
-     * Set the input table
-     *
-     * @param table the table
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder inputTable(String table) {
-      spec.inputTable = table;
-      return this;
-    }
-
-    /**
-     * Partition filter for input table
-     *
-     * @param partFilter the part filter
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder partitionFilter(String partFilter) {
-      spec.partitionFilter = partFilter;
-      return this;
-    }
-
-    /**
-     * Feature columns.
-     *
-     * @param featureColumns the feature columns
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder featureColumns(List<String> featureColumns) {
-      spec.featureColumns = featureColumns;
-      return this;
-    }
-
-    /**
-     * Labe column.
-     *
-     * @param labelColumn the label column
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder lableColumn(String labelColumn) {
-      spec.labelColumn = labelColumn;
-      return this;
-    }
-
-    /**
-     * Output column.
-     *
-     * @param outputColumn the output column
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder outputColumn(String outputColumn) {
-      spec.outputColumn = outputColumn;
-      return this;
-    }
-
-    /**
-     * Output table.
-     *
-     * @param table the table
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder outputTable(String table) {
-      spec.outputTable = table;
-      return this;
-    }
-
-    /**
-     * Hive conf.
-     *
-     * @param conf the conf
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder hiveConf(HiveConf conf) {
-      spec.conf = conf;
-      return this;
-    }
-
-    /**
-     * Algorithm.
-     *
-     * @param algorithm the algorithm
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder algorithm(String algorithm) {
-      spec.algorithm = algorithm;
-      return this;
-    }
-
-    /**
-     * Model id.
-     *
-     * @param modelID the model id
-     * @return the table testing spec builder
-     */
-    public TableTestingSpecBuilder modelID(String modelID) {
-      spec.modelID = modelID;
-      return this;
-    }
-
-    /**
-     * Builds the.
-     *
-     * @return the table testing spec
-     */
-    public TableTestingSpec build() {
-      return spec;
-    }
-
-    /**
-     * Set the unique test id
-     *
-     * @param testID
-     * @return
-     */
-    public TableTestingSpecBuilder testID(String testID) {
-      spec.testID = testID;
-      return this;
-    }
-  }
-
-  /**
-   * New builder.
-   *
-   * @return the table testing spec builder
-   */
-  public static TableTestingSpecBuilder newBuilder() {
-    return new TableTestingSpecBuilder();
-  }
-
-  /**
-   * Validate.
-   *
-   * @return true, if successful
-   */
-  public boolean validate() {
-    List<FieldSchema> columns;
-    try {
-      Hive metastoreClient = Hive.get(conf);
-      Table tbl = (db == null) ? metastoreClient.getTable(inputTable) : metastoreClient.getTable(db, inputTable);
-      columns = tbl.getAllCols();
-      columnNameToFieldSchema = new HashMap<String, FieldSchema>();
-
-      for (FieldSchema fieldSchema : columns) {
-        columnNameToFieldSchema.put(fieldSchema.getName(), fieldSchema);
-      }
-
-      // Check if output table exists
-      Table outTbl = metastoreClient.getTable(db == null ? "default" : db, outputTable, false);
-      outputTableExists = (outTbl != null);
-    } catch (HiveException exc) {
-      LOG.error("Error getting table info " + toString(), exc);
-      return false;
-    }
-
-    // Check if labeled column and feature columns are contained in the table
-    List<String> testTableColumns = new ArrayList<String>(columns.size());
-    for (FieldSchema column : columns) {
-      testTableColumns.add(column.getName());
-    }
-
-    if (!testTableColumns.containsAll(featureColumns)) {
-      LOG.info("Invalid feature columns: " + featureColumns + ". Actual columns in table:" + testTableColumns);
-      return false;
-    }
-
-    if (!testTableColumns.contains(labelColumn)) {
-      LOG.info("Invalid label column: " + labelColumn + ". Actual columns in table:" + testTableColumns);
-      return false;
-    }
-
-    if (StringUtils.isBlank(outputColumn)) {
-      LOG.info("Output column is required");
-      return false;
-    }
-
-    if (StringUtils.isBlank(outputTable)) {
-      LOG.info("Output table is required");
-      return false;
-    }
-    return true;
-  }
-
-  public String getTestQuery() {
-    if (!validate()) {
-      return null;
-    }
-
-    // We always insert a dynamic partition
-    StringBuilder q = new StringBuilder("INSERT OVERWRITE TABLE " + outputTable + " PARTITION (part_testid='" + testID
-      + "')  SELECT ");
-    String featureCols = StringUtils.join(featureColumns, ",");
-    q.append(featureCols).append(",").append(labelColumn).append(", ").append("predict(").append("'").append(algorithm)
-      .append("', ").append("'").append(modelID).append("', ").append(featureCols).append(") ").append(outputColumn)
-      .append(" FROM ").append(inputTable);
-
-    return q.toString();
-  }
-
-  public String getCreateOutputTableQuery() {
-    StringBuilder createTableQuery = new StringBuilder("CREATE TABLE IF NOT EXISTS ").append(outputTable).append("(");
-    // Output table contains feature columns, label column, output column
-    List<String> outputTableColumns = new ArrayList<String>();
-    for (String featureCol : featureColumns) {
-      outputTableColumns.add(featureCol + " " + columnNameToFieldSchema.get(featureCol).getType());
-    }
-
-    outputTableColumns.add(labelColumn + " " + columnNameToFieldSchema.get(labelColumn).getType());
-    outputTableColumns.add(outputColumn + " string");
-
-    createTableQuery.append(StringUtils.join(outputTableColumns, ", "));
-
-    // Append partition column
-    createTableQuery.append(") PARTITIONED BY (part_testid string)");
-
-    return createTableQuery.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/AlgoParam.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/AlgoParam.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/AlgoParam.java
new file mode 100644
index 0000000..e0d13c0
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/AlgoParam.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The Interface AlgoParam.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface AlgoParam {
+
+  /**
+   * Name.
+   *
+   * @return the string
+   */
+  String name();
+
+  /**
+   * Help.
+   *
+   * @return the string
+   */
+  String help();
+
+  /**
+   * Default value.
+   *
+   * @return the string
+   */
+  String defaultValue() default "None";
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/Algorithm.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/Algorithm.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/Algorithm.java
new file mode 100644
index 0000000..29bde29
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/Algorithm.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The Interface Algorithm.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface Algorithm {
+
+  /**
+   * Name.
+   *
+   * @return the string
+   */
+  String name();
+
+  /**
+   * Description.
+   *
+   * @return the string
+   */
+  String description();
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLAlgo.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLAlgo.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLAlgo.java
new file mode 100644
index 0000000..44b0043
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLAlgo.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensException;
+
+/**
+ * The Interface MLAlgo.
+ */
+public interface MLAlgo {
+  String getName();
+
+  String getDescription();
+
+  /**
+   * Configure.
+   *
+   * @param configuration the configuration
+   */
+  void configure(LensConf configuration);
+
+  LensConf getConf();
+
+  /**
+   * Train.
+   *
+   * @param conf    the conf
+   * @param db      the db
+   * @param table   the table
+   * @param modelId the model id
+   * @param params  the params
+   * @return the ML model
+   * @throws LensException the lens exception
+   */
+  MLModel train(LensConf conf, String db, String table, String modelId, String... params) throws LensException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLDriver.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLDriver.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLDriver.java
new file mode 100644
index 0000000..1aa699d
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLDriver.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import java.util.List;
+
+import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensException;
+
+/**
+ * The Interface MLDriver.
+ */
+public interface MLDriver {
+
+  /**
+   * Checks if is algo supported.
+   *
+   * @param algo the algo
+   * @return true, if is algo supported
+   */
+  boolean isAlgoSupported(String algo);
+
+  /**
+   * Gets the algo instance.
+   *
+   * @param algo the algo
+   * @return the algo instance
+   * @throws LensException the lens exception
+   */
+  MLAlgo getAlgoInstance(String algo) throws LensException;
+
+  /**
+   * Inits the.
+   *
+   * @param conf the conf
+   * @throws LensException the lens exception
+   */
+  void init(LensConf conf) throws LensException;
+
+  /**
+   * Start.
+   *
+   * @throws LensException the lens exception
+   */
+  void start() throws LensException;
+
+  /**
+   * Stop.
+   *
+   * @throws LensException the lens exception
+   */
+  void stop() throws LensException;
+
+  List<String> getAlgoNames();
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLModel.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLModel.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLModel.java
new file mode 100644
index 0000000..73717ac
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLModel.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.List;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * Instantiates a new ML model.
+ */
+@NoArgsConstructor
+@ToString
+public abstract class MLModel<PREDICTION> implements Serializable {
+
+  /** The id. */
+  @Getter
+  @Setter
+  private String id;
+
+  /** The created at. */
+  @Getter
+  @Setter
+  private Date createdAt;
+
+  /** The algo name. */
+  @Getter
+  @Setter
+  private String algoName;
+
+  /** The table. */
+  @Getter
+  @Setter
+  private String table;
+
+  /** The params. */
+  @Getter
+  @Setter
+  private List<String> params;
+
+  /** The label column. */
+  @Getter
+  @Setter
+  private String labelColumn;
+
+  /** The feature columns. */
+  @Getter
+  @Setter
+  private List<String> featureColumns;
+
+  /**
+   * Predict.
+   *
+   * @param args the args
+   * @return the prediction
+   */
+  public abstract PREDICTION predict(Object... args);
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/AlgoArgParser.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/AlgoArgParser.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/AlgoArgParser.java
new file mode 100644
index 0000000..51979d8
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/AlgoArgParser.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lens.ml.algo.api.AlgoParam;
+import org.apache.lens.ml.algo.api.MLAlgo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The Class AlgoArgParser.
+ */
+public final class AlgoArgParser {
+  private AlgoArgParser() {
+  }
+
+  /**
+   * The Class CustomArgParser.
+   *
+   * @param <E> the element type
+   */
+  public abstract static class CustomArgParser<E> {
+
+    /**
+     * Parses the.
+     *
+     * @param value the value
+     * @return the e
+     */
+    public abstract E parse(String value);
+  }
+
+  /** The Constant LOG. */
+  public static final Log LOG = LogFactory.getLog(AlgoArgParser.class);
+
+  /**
+   * Extracts feature names. If the algo has any parameters associated with @AlgoParam annotation, those are set
+   * as well.
+   *
+   * @param algo the algo
+   * @param args    the args
+   * @return List of feature column names.
+   */
+  public static List<String> parseArgs(MLAlgo algo, String[] args) {
+    List<String> featureColumns = new ArrayList<String>();
+    Class<? extends MLAlgo> algoClass = algo.getClass();
+    // Get param fields
+    Map<String, Field> fieldMap = new HashMap<String, Field>();
+
+    for (Field fld : algoClass.getDeclaredFields()) {
+      fld.setAccessible(true);
+      AlgoParam paramAnnotation = fld.getAnnotation(AlgoParam.class);
+      if (paramAnnotation != null) {
+        fieldMap.put(paramAnnotation.name(), fld);
+      }
+    }
+
+    for (int i = 0; i < args.length; i += 2) {
+      String key = args[i].trim();
+      String value = args[i + 1].trim();
+
+      try {
+        if ("feature".equalsIgnoreCase(key)) {
+          featureColumns.add(value);
+        } else if (fieldMap.containsKey(key)) {
+          Field f = fieldMap.get(key);
+          if (String.class.equals(f.getType())) {
+            f.set(algo, value);
+          } else if (Integer.TYPE.equals(f.getType())) {
+            f.setInt(algo, Integer.parseInt(value));
+          } else if (Double.TYPE.equals(f.getType())) {
+            f.setDouble(algo, Double.parseDouble(value));
+          } else if (Long.TYPE.equals(f.getType())) {
+            f.setLong(algo, Long.parseLong(value));
+          } else {
+            // check if the algo provides a deserializer for this param
+            String customParserClass = algo.getConf().getProperties().get("lens.ml.args." + key);
+            if (customParserClass != null) {
+              Class<? extends CustomArgParser<?>> clz = (Class<? extends CustomArgParser<?>>) Class
+                .forName(customParserClass);
+              CustomArgParser<?> parser = clz.newInstance();
+              f.set(algo, parser.parse(value));
+            } else {
+              LOG.warn("Ignored param " + key + "=" + value + " as no parser found");
+            }
+          }
+        }
+      } catch (Exception exc) {
+        LOG.error("Error while setting param " + key + " to " + value + " for algo " + algo);
+      }
+    }
+    return featureColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/Algorithms.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/Algorithms.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/Algorithms.java
new file mode 100644
index 0000000..a2fd94b
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/Algorithms.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lens.api.LensException;
+import org.apache.lens.ml.algo.api.Algorithm;
+import org.apache.lens.ml.algo.api.MLAlgo;
+
+/**
+ * The Class Algorithms.
+ */
+public class Algorithms {
+
+  /** The algorithm classes. */
+  private final Map<String, Class<? extends MLAlgo>> algorithmClasses
+    = new HashMap<String, Class<? extends MLAlgo>>();
+
+  /**
+   * Register.
+   *
+   * @param algoClass the algo class
+   */
+  public void register(Class<? extends MLAlgo> algoClass) {
+    if (algoClass != null && algoClass.getAnnotation(Algorithm.class) != null) {
+      algorithmClasses.put(algoClass.getAnnotation(Algorithm.class).name(), algoClass);
+    } else {
+      throw new IllegalArgumentException("Not a valid algorithm class: " + algoClass);
+    }
+  }
+
+  /**
+   * Gets the algo for name.
+   *
+   * @param name the name
+   * @return the algo for name
+   * @throws LensException the lens exception
+   */
+  public MLAlgo getAlgoForName(String name) throws LensException {
+    Class<? extends MLAlgo> algoClass = algorithmClasses.get(name);
+    if (algoClass == null) {
+      return null;
+    }
+    Algorithm algoAnnotation = algoClass.getAnnotation(Algorithm.class);
+    String description = algoAnnotation.description();
+    try {
+      Constructor<? extends MLAlgo> algoConstructor = algoClass.getConstructor(String.class, String.class);
+      return algoConstructor.newInstance(name, description);
+    } catch (Exception exc) {
+      throw new LensException("Unable to get algo: " + name, exc);
+    }
+  }
+
+  /**
+   * Checks if is algo supported.
+   *
+   * @param name the name
+   * @return true, if is algo supported
+   */
+  public boolean isAlgoSupported(String name) {
+    return algorithmClasses.containsKey(name);
+  }
+
+  public List<String> getAlgorithmNames() {
+    return new ArrayList<String>(algorithmClasses.keySet());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ClassifierBaseModel.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ClassifierBaseModel.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ClassifierBaseModel.java
new file mode 100644
index 0000000..a960a4a
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ClassifierBaseModel.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import org.apache.lens.ml.algo.api.MLModel;
+
+/**
+ * Return a single double value as a prediction. This is useful in classifiers where the classifier returns a single
+ * class label as a prediction.
+ */
+public abstract class ClassifierBaseModel extends MLModel<Double> {
+
+  /**
+   * Gets the feature vector.
+   *
+   * @param args the args
+   * @return the feature vector
+   */
+  public final double[] getFeatureVector(Object[] args) {
+    double[] features = new double[args.length];
+    for (int i = 0; i < args.length; i++) {
+      if (args[i] instanceof Double) {
+        features[i] = (Double) args[i];
+      } else if (args[i] instanceof String) {
+        features[i] = Double.parseDouble((String) args[i]);
+      } else {
+        features[i] = 0.0;
+      }
+    }
+    return features;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ForecastingModel.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ForecastingModel.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ForecastingModel.java
new file mode 100644
index 0000000..16a6180
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ForecastingModel.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import java.util.List;
+
+import org.apache.lens.ml.algo.api.MLModel;
+
+/**
+ * The Class ForecastingModel.
+ */
+public class ForecastingModel extends MLModel<MultiPrediction> {
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.lens.ml.MLModel#predict(java.lang.Object[])
+   */
+  @Override
+  public MultiPrediction predict(Object... args) {
+    return new ForecastingPredictions(null);
+  }
+
+  /**
+   * The Class ForecastingPredictions.
+   */
+  public static class ForecastingPredictions implements MultiPrediction {
+
+    /** The values. */
+    private final List<LabelledPrediction> values;
+
+    /**
+     * Instantiates a new forecasting predictions.
+     *
+     * @param values the values
+     */
+    public ForecastingPredictions(List<LabelledPrediction> values) {
+      this.values = values;
+    }
+
+    @Override
+    public List<LabelledPrediction> getPredictions() {
+      return values;
+    }
+  }
+
+  /**
+   * The Class ForecastingLabel.
+   */
+  public static class ForecastingLabel implements LabelledPrediction<Long, Double> {
+
+    /** The timestamp. */
+    private final Long timestamp;
+
+    /** The value. */
+    private final double value;
+
+    /**
+     * Instantiates a new forecasting label.
+     *
+     * @param timestamp the timestamp
+     * @param value     the value
+     */
+    public ForecastingLabel(long timestamp, double value) {
+      this.timestamp = timestamp;
+      this.value = value;
+    }
+
+    @Override
+    public Long getLabel() {
+      return timestamp;
+    }
+
+    @Override
+    public Double getPrediction() {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/LabelledPrediction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/LabelledPrediction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/LabelledPrediction.java
new file mode 100644
index 0000000..9c7737e
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/LabelledPrediction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+/**
+ * Prediction type used when the model prediction is of complex types. For example, in forecasting the predictions are a
+ * series of timestamp, and value pairs.
+ *
+ * @param <LABELTYPE>      the generic type
+ * @param <PREDICTIONTYPE> the generic type
+ */
+public interface LabelledPrediction<LABELTYPE, PREDICTIONTYPE> {
+  LABELTYPE getLabel();
+
+  PREDICTIONTYPE getPrediction();
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/MultiPrediction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/MultiPrediction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/MultiPrediction.java
new file mode 100644
index 0000000..e910a92
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/MultiPrediction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import java.util.List;
+
+/**
+ * The Interface MultiPrediction.
+ */
+public interface MultiPrediction {
+  List<LabelledPrediction> getPredictions();
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkAlgo.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkAlgo.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkAlgo.java
new file mode 100644
index 0000000..4012085
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkAlgo.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensException;
+import org.apache.lens.ml.algo.api.AlgoParam;
+import org.apache.lens.ml.algo.api.Algorithm;
+import org.apache.lens.ml.algo.api.MLAlgo;
+import org.apache.lens.ml.algo.api.MLModel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.regression.LabeledPoint;
+import org.apache.spark.rdd.RDD;
+
+/**
+ * The Class BaseSparkAlgo.
+ */
+public abstract class BaseSparkAlgo implements MLAlgo {
+
+  /** The Constant LOG. */
+  public static final Log LOG = LogFactory.getLog(BaseSparkAlgo.class);
+
+  /** The name. */
+  private final String name;
+
+  /** The description. */
+  private final String description;
+
+  /** The spark context. */
+  protected JavaSparkContext sparkContext;
+
+  /** The params. */
+  protected Map<String, String> params;
+
+  /** The conf. */
+  protected transient LensConf conf;
+
+  /** The training fraction. */
+  @AlgoParam(name = "trainingFraction", help = "% of dataset to be used for training", defaultValue = "0")
+  protected double trainingFraction;
+
+  /** The use training fraction. */
+  private boolean useTrainingFraction;
+
+  /** The label. */
+  @AlgoParam(name = "label", help = "Name of column which is used as a training label for supervised learning")
+  protected String label;
+
+  /** The partition filter. */
+  @AlgoParam(name = "partition", help = "Partition filter used to create create HCatInputFormats")
+  protected String partitionFilter;
+
+  /** The features. */
+  @AlgoParam(name = "feature", help = "Column name(s) which are to be used as sample features")
+  protected List<String> features;
+
+  /**
+   * Instantiates a new base spark algo.
+   *
+   * @param name        the name
+   * @param description the description
+   */
+  public BaseSparkAlgo(String name, String description) {
+    this.name = name;
+    this.description = description;
+  }
+
+  public void setSparkContext(JavaSparkContext sparkContext) {
+    this.sparkContext = sparkContext;
+  }
+
+  @Override
+  public LensConf getConf() {
+    return conf;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.lens.ml.MLAlgo#configure(org.apache.lens.api.LensConf)
+   */
+  @Override
+  public void configure(LensConf configuration) {
+    this.conf = configuration;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.lens.ml.MLAlgo#train(org.apache.lens.api.LensConf, java.lang.String, java.lang.String,
+   * java.lang.String, java.lang.String[])
+   */
+  @Override
+  public MLModel<?> train(LensConf conf, String db, String table, String modelId, String... params)
+    throws LensException {
+    parseParams(params);
+
+    TableTrainingSpec.TableTrainingSpecBuilder builder = TableTrainingSpec.newBuilder().hiveConf(toHiveConf(conf))
+      .database(db).table(table).partitionFilter(partitionFilter).featureColumns(features).labelColumn(label);
+
+    if (useTrainingFraction) {
+      builder.trainingFraction(trainingFraction);
+    }
+
+    TableTrainingSpec spec = builder.build();
+    LOG.info("Training " + " with " + features.size() + " features");
+
+    spec.createRDDs(sparkContext);
+
+    RDD<LabeledPoint> trainingRDD = spec.getTrainingRDD();
+    BaseSparkClassificationModel<?> model = trainInternal(modelId, trainingRDD);
+    model.setTable(table);
+    model.setParams(Arrays.asList(params));
+    model.setLabelColumn(label);
+    model.setFeatureColumns(features);
+    return model;
+  }
+
+  /**
+   * To hive conf.
+   *
+   * @param conf the conf
+   * @return the hive conf
+   */
+  protected HiveConf toHiveConf(LensConf conf) {
+    HiveConf hiveConf = new HiveConf();
+    for (String key : conf.getProperties().keySet()) {
+      hiveConf.set(key, conf.getProperties().get(key));
+    }
+    return hiveConf;
+  }
+
+  /**
+   * Parses the params.
+   *
+   * @param args the args
+   */
+  public void parseParams(String[] args) {
+    if (args.length % 2 != 0) {
+      throw new IllegalArgumentException("Invalid number of params " + args.length);
+    }
+
+    params = new LinkedHashMap<String, String>();
+
+    for (int i = 0; i < args.length; i += 2) {
+      if ("f".equalsIgnoreCase(args[i]) || "feature".equalsIgnoreCase(args[i])) {
+        if (features == null) {
+          features = new ArrayList<String>();
+        }
+        features.add(args[i + 1]);
+      } else if ("l".equalsIgnoreCase(args[i]) || "label".equalsIgnoreCase(args[i])) {
+        label = args[i + 1];
+      } else {
+        params.put(args[i].replaceAll("\\-+", ""), args[i + 1]);
+      }
+    }
+
+    if (params.containsKey("trainingFraction")) {
+      // Get training Fraction
+      String trainingFractionStr = params.get("trainingFraction");
+      try {
+        trainingFraction = Double.parseDouble(trainingFractionStr);
+        useTrainingFraction = true;
+      } catch (NumberFormatException nfe) {
+        throw new IllegalArgumentException("Invalid training fraction", nfe);
+      }
+    }
+
+    if (params.containsKey("partition") || params.containsKey("p")) {
+      partitionFilter = params.containsKey("partition") ? params.get("partition") : params.get("p");
+    }
+
+    parseAlgoParams(params);
+  }
+
+  /**
+   * Gets the param value.
+   *
+   * @param param      the param
+   * @param defaultVal the default val
+   * @return the param value
+   */
+  public double getParamValue(String param, double defaultVal) {
+    if (params.containsKey(param)) {
+      try {
+        return Double.parseDouble(params.get(param));
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Couldn't parse param value: " + param + " as double.");
+      }
+    }
+    return defaultVal;
+  }
+
+  /**
+   * Gets the param value.
+   *
+   * @param param      the param
+   * @param defaultVal the default val
+   * @return the param value
+   */
+  public int getParamValue(String param, int defaultVal) {
+    if (params.containsKey(param)) {
+      try {
+        return Integer.parseInt(params.get(param));
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Couldn't parse param value: " + param + " as integer.");
+      }
+    }
+    return defaultVal;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public Map<String, String> getArgUsage() {
+    Map<String, String> usage = new LinkedHashMap<String, String>();
+    Class<?> clz = this.getClass();
+    // Put class name and description as well as part of the usage
+    Algorithm algorithm = clz.getAnnotation(Algorithm.class);
+    if (algorithm != null) {
+      usage.put("Algorithm Name", algorithm.name());
+      usage.put("Algorithm Description", algorithm.description());
+    }
+
+    // Get all algo params including base algo params
+    while (clz != null) {
+      for (Field field : clz.getDeclaredFields()) {
+        AlgoParam param = field.getAnnotation(AlgoParam.class);
+        if (param != null) {
+          usage.put("[param] " + param.name(), param.help() + " Default Value = " + param.defaultValue());
+        }
+      }
+
+      if (clz.equals(BaseSparkAlgo.class)) {
+        break;
+      }
+      clz = clz.getSuperclass();
+    }
+    return usage;
+  }
+
+  /**
+   * Parses the algo params.
+   *
+   * @param params the params
+   */
+  public abstract void parseAlgoParams(Map<String, String> params);
+
+  /**
+   * Train internal.
+   *
+   * @param modelId     the model id
+   * @param trainingRDD the training rdd
+   * @return the base spark classification model
+   * @throws LensException the lens exception
+   */
+  protected abstract BaseSparkClassificationModel trainInternal(String modelId, RDD<LabeledPoint> trainingRDD)
+    throws LensException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkClassificationModel.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkClassificationModel.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkClassificationModel.java
new file mode 100644
index 0000000..806dc1f
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkClassificationModel.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import org.apache.lens.ml.algo.lib.ClassifierBaseModel;
+
+import org.apache.spark.mllib.classification.ClassificationModel;
+import org.apache.spark.mllib.linalg.Vectors;
+
+/**
+ * The Class BaseSparkClassificationModel.
+ *
+ * @param <MODEL> the generic type
+ */
+public class BaseSparkClassificationModel<MODEL extends ClassificationModel> extends ClassifierBaseModel {
+
+  /** The model id. */
+  private final String modelId;
+
+  /** The spark model. */
+  private final MODEL sparkModel;
+
+  /**
+   * Instantiates a new base spark classification model.
+   *
+   * @param modelId the model id
+   * @param model   the model
+   */
+  public BaseSparkClassificationModel(String modelId, MODEL model) {
+    this.modelId = modelId;
+    this.sparkModel = model;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.lens.ml.MLModel#predict(java.lang.Object[])
+   */
+  @Override
+  public Double predict(Object... args) {
+    return sparkModel.predict(Vectors.dense(getFeatureVector(args)));
+  }
+
+  @Override
+  public String getId() {
+    return modelId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/ColumnFeatureFunction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/ColumnFeatureFunction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/ColumnFeatureFunction.java
new file mode 100644
index 0000000..d75efc0
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/ColumnFeatureFunction.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.log4j.Logger;
+import org.apache.spark.mllib.linalg.Vectors;
+import org.apache.spark.mllib.regression.LabeledPoint;
+
+import com.google.common.base.Preconditions;
+import scala.Tuple2;
+
+/**
+ * A feature function that directly maps an HCatRecord to a feature vector. Each column becomes a feature in the vector,
+ * with the value of the feature obtained using the value mapper for that column
+ */
+public class ColumnFeatureFunction extends FeatureFunction {
+
+  /** The Constant LOG. */
+  public static final Logger LOG = Logger.getLogger(ColumnFeatureFunction.class);
+
+  /** The feature value mappers. */
+  private final FeatureValueMapper[] featureValueMappers;
+
+  /** The feature positions. */
+  private final int[] featurePositions;
+
+  /** The label column pos. */
+  private final int labelColumnPos;
+
+  /** The num features. */
+  private final int numFeatures;
+
+  /** The default labeled point. */
+  private final LabeledPoint defaultLabeledPoint;
+
+  /**
+   * Feature positions and value mappers are parallel arrays. featurePositions[i] gives the position of ith feature in
+   * the HCatRecord, and valueMappers[i] gives the value mapper used to map that feature to a Double value
+   *
+   * @param featurePositions position number of feature column in the HCatRecord
+   * @param valueMappers     mapper for each column position
+   * @param labelColumnPos   position of the label column
+   * @param numFeatures      number of features in the feature vector
+   * @param defaultLabel     default lable to be used for null records
+   */
+  public ColumnFeatureFunction(int[] featurePositions, FeatureValueMapper[] valueMappers, int labelColumnPos,
+    int numFeatures, double defaultLabel) {
+    Preconditions.checkNotNull(valueMappers, "Value mappers argument is required");
+    Preconditions.checkNotNull(featurePositions, "Feature positions are required");
+    Preconditions.checkArgument(valueMappers.length == featurePositions.length,
+      "Mismatch between number of value mappers and feature positions");
+
+    this.featurePositions = featurePositions;
+    this.featureValueMappers = valueMappers;
+    this.labelColumnPos = labelColumnPos;
+    this.numFeatures = numFeatures;
+    defaultLabeledPoint = new LabeledPoint(defaultLabel, Vectors.dense(new double[numFeatures]));
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.lens.ml.spark.FeatureFunction#call(scala.Tuple2)
+   */
+  @Override
+  public LabeledPoint call(Tuple2<WritableComparable, HCatRecord> tuple) throws Exception {
+    HCatRecord record = tuple._2();
+
+    if (record == null) {
+      LOG.info("@@@ Null record");
+      return defaultLabeledPoint;
+    }
+
+    double[] features = new double[numFeatures];
+
+    for (int i = 0; i < numFeatures; i++) {
+      int featurePos = featurePositions[i];
+      features[i] = featureValueMappers[i].call(record.get(featurePos));
+    }
+
+    double label = featureValueMappers[labelColumnPos].call(record.get(labelColumnPos));
+    return new LabeledPoint(label, Vectors.dense(features));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/DoubleValueMapper.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/DoubleValueMapper.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/DoubleValueMapper.java
new file mode 100644
index 0000000..15ba9ea
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/DoubleValueMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+/**
+ * Directly return input when it is known to be double.
+ */
+public class DoubleValueMapper extends FeatureValueMapper {
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.lens.ml.spark.FeatureValueMapper#call(java.lang.Object)
+   */
+  @Override
+  public final Double call(Object input) {
+    if (input instanceof Double || input == null) {
+      return input == null ? Double.valueOf(0d) : (Double) input;
+    }
+
+    throw new IllegalArgumentException("Invalid input expecting only doubles, but got " + input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureFunction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureFunction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureFunction.java
new file mode 100644
index 0000000..5e2ab49
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.mllib.regression.LabeledPoint;
+
+import scala.Tuple2;
+
+/**
+ * Function to map an HCatRecord to a feature vector usable by MLLib.
+ */
+public abstract class FeatureFunction implements Function<Tuple2<WritableComparable, HCatRecord>, LabeledPoint> {
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.spark.api.java.function.Function#call(java.lang.Object)
+   */
+  @Override
+  public abstract LabeledPoint call(Tuple2<WritableComparable, HCatRecord> tuple) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureValueMapper.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureValueMapper.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureValueMapper.java
new file mode 100644
index 0000000..28c8787
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureValueMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import java.io.Serializable;
+
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * Map a feature value to a Double value usable by MLLib.
+ */
+public abstract class FeatureValueMapper implements Function<Object, Double>, Serializable {
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.spark.api.java.function.Function#call(java.lang.Object)
+   */
+  public abstract Double call(Object input);
+}

http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/HiveTableRDD.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/HiveTableRDD.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/HiveTableRDD.java
new file mode 100644
index 0000000..4960e1e
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/HiveTableRDD.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Create a JavaRDD based on a Hive table using HCatInputFormat.
+ */
+public final class HiveTableRDD {
+  private HiveTableRDD() {
+  }
+
+  public static final Log LOG = LogFactory.getLog(HiveTableRDD.class);
+
+  /**
+   * Creates the hive table rdd.
+   *
+   * @param javaSparkContext the java spark context
+   * @param conf             the conf
+   * @param db               the db
+   * @param table            the table
+   * @param partitionFilter  the partition filter
+   * @return the java pair rdd
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  public static JavaPairRDD<WritableComparable, HCatRecord> createHiveTableRDD(JavaSparkContext javaSparkContext,
+    Configuration conf, String db, String table, String partitionFilter) throws IOException {
+
+    HCatInputFormat.setInput(conf, db, table, partitionFilter);
+
+    JavaPairRDD<WritableComparable, HCatRecord> rdd = javaSparkContext.newAPIHadoopRDD(conf,
+      HCatInputFormat.class, // Input
+      WritableComparable.class, // input key class
+      HCatRecord.class); // input value class
+    return rdd;
+  }
+}