You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by am...@apache.org on 2015/04/15 21:50:02 UTC
[28/50] [abbrv] incubator-lens git commit: Lens-465 : Refactor ml
packages. (sharad)
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MLRunner.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLRunner.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MLRunner.java
deleted file mode 100644
index bd50cba..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLRunner.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.lens.client.LensClient;
-import org.apache.lens.client.LensClientConfig;
-import org.apache.lens.client.LensMLClient;
-import org.apache.lens.ml.task.MLTask;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-public class MLRunner {
-
- private static final Log LOG = LogFactory.getLog(MLRunner.class);
-
- private LensMLClient mlClient;
- private String algoName;
- private String database;
- private String trainTable;
- private String trainFile;
- private String testTable;
- private String testFile;
- private String outputTable;
- private String[] features;
- private String labelColumn;
- private HiveConf conf;
-
- public void init(LensMLClient mlClient, String confDir) throws Exception {
- File dir = new File(confDir);
- File propFile = new File(dir, "ml.properties");
- Properties props = new Properties();
- props.load(new FileInputStream(propFile));
- String feat = props.getProperty("features");
- String trainFile = confDir + File.separator + "train.data";
- String testFile = confDir + File.separator + "test.data";
- init(mlClient, props.getProperty("algo"), props.getProperty("database"),
- props.getProperty("traintable"), trainFile,
- props.getProperty("testtable"), testFile,
- props.getProperty("outputtable"), feat.split(","),
- props.getProperty("labelcolumn"));
- }
-
- public void init(LensMLClient mlClient, String algoName,
- String database, String trainTable, String trainFile,
- String testTable, String testFile, String outputTable, String[] features,
- String labelColumn) {
- this.mlClient = mlClient;
- this.algoName = algoName;
- this.database = database;
- this.trainTable = trainTable;
- this.trainFile = trainFile;
- this.testTable = testTable;
- this.testFile = testFile;
- this.outputTable = outputTable;
- this.features = features;
- this.labelColumn = labelColumn;
- //hive metastore settings are loaded via lens-site.xml, so loading LensClientConfig
- //is required
- this.conf = new HiveConf(new LensClientConfig(), MLRunner.class);
- }
-
- public MLTask train() throws Exception {
- LOG.info("Starting train & eval");
-
- createTable(trainTable, trainFile);
- createTable(testTable, testFile);
- MLTask.Builder taskBuilder = new MLTask.Builder();
- taskBuilder.algorithm(algoName).hiveConf(conf).labelColumn(labelColumn).outputTable(outputTable)
- .client(mlClient).trainingTable(trainTable).testTable(testTable);
-
- // Add features
- for (String feature : features) {
- taskBuilder.addFeatureColumn(feature);
- }
- MLTask task = taskBuilder.build();
- LOG.info("Created task " + task.toString());
- task.run();
- return task;
- }
-
- public void createTable(String tableName, String dataFile) throws HiveException {
-
- File filedataFile = new File(dataFile);
- Path dataFilePath = new Path(filedataFile.toURI());
- Path partDir = dataFilePath.getParent();
-
- // Create table
- List<FieldSchema> columns = new ArrayList<FieldSchema>();
-
- // Label is optional. Not used for unsupervised models.
- // If present, label will be the first column, followed by features
- if (labelColumn != null) {
- columns.add(new FieldSchema(labelColumn, "double", "Labelled Column"));
- }
-
- for (String feature : features) {
- columns.add(new FieldSchema(feature, "double", "Feature " + feature));
- }
-
- Table tbl = Hive.get(conf).newTable(database + "." + tableName);
- tbl.setTableType(TableType.MANAGED_TABLE);
- tbl.getTTable().getSd().setCols(columns);
- // tbl.getTTable().getParameters().putAll(new HashMap<String, String>());
- tbl.setInputFormatClass(TextInputFormat.class);
- tbl.setSerdeParam(serdeConstants.LINE_DELIM, "\n");
- tbl.setSerdeParam(serdeConstants.FIELD_DELIM, " ");
-
- List<FieldSchema> partCols = new ArrayList<FieldSchema>(1);
- partCols.add(new FieldSchema("dummy_partition_col", "string", ""));
- tbl.setPartCols(partCols);
-
- Hive.get(conf).dropTable(database, tableName, false, true);
- Hive.get(conf).createTable(tbl, true);
- LOG.info("Created table " + tableName);
-
- // Add partition for the data file
- AddPartitionDesc partitionDesc = new AddPartitionDesc(database, tableName,
- false);
- Map<String, String> partSpec = new HashMap<String, String>();
- partSpec.put("dummy_partition_col", "dummy_val");
- partitionDesc.addPartition(partSpec, partDir.toUri().toString());
- Hive.get(conf).createPartitions(partitionDesc);
- LOG.info(tableName + ": Added partition " + partDir.toUri().toString());
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length < 1) {
- System.out.println("Usage: org.apache.lens.ml.MLRunner <ml-conf-dir>");
- System.exit(-1);
- }
- String confDir = args[0];
- LensMLClient client = new LensMLClient(new LensClient());
- MLRunner runner = new MLRunner();
- runner.init(client, confDir);
- runner.train();
- System.out.println("Created the Model successfully. Output Table: " + runner.outputTable);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestMetric.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestMetric.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestMetric.java
deleted file mode 100644
index 57adecc..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestMetric.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-/**
- * The Interface MLTestMetric.
- */
-public interface MLTestMetric {
- String getName();
-
- String getDescription();
-}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestReport.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestReport.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestReport.java
deleted file mode 100644
index 909e6df..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLTestReport.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.io.Serializable;
-import java.util.List;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
-
-/**
- * Instantiates a new ML test report.
- */
-@NoArgsConstructor
-@ToString
-public class MLTestReport implements Serializable {
-
- /** The test table. */
- @Getter
- @Setter
- private String testTable;
-
- /** The output table. */
- @Getter
- @Setter
- private String outputTable;
-
- /** The output column. */
- @Getter
- @Setter
- private String outputColumn;
-
- /** The label column. */
- @Getter
- @Setter
- private String labelColumn;
-
- /** The feature columns. */
- @Getter
- @Setter
- private List<String> featureColumns;
-
- /** The algorithm. */
- @Getter
- @Setter
- private String algorithm;
-
- /** The model id. */
- @Getter
- @Setter
- private String modelID;
-
- /** The report id. */
- @Getter
- @Setter
- private String reportID;
-
- /** The query id. */
- @Getter
- @Setter
- private String queryID;
-
- /** The test output path. */
- @Getter
- @Setter
- private String testOutputPath;
-
- /** The prediction result column. */
- @Getter
- @Setter
- private String predictionResultColumn;
-
- /** The lens query id. */
- @Getter
- @Setter
- private String lensQueryID;
-}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MLUtils.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLUtils.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MLUtils.java
deleted file mode 100644
index 2e240af..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MLUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import org.apache.lens.server.api.LensConfConstants;
-import org.apache.lens.server.api.ServiceProvider;
-import org.apache.lens.server.api.ServiceProviderFactory;
-import org.apache.lens.server.ml.MLService;
-import org.apache.lens.server.ml.MLServiceImpl;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-
-public final class MLUtils {
- private MLUtils() {
- }
-
- private static final HiveConf HIVE_CONF;
-
- static {
- HIVE_CONF = new HiveConf();
- // Add default config so that we know the service provider implementation
- HIVE_CONF.addResource("lensserver-default.xml");
- HIVE_CONF.addResource("lens-site.xml");
- }
-
- public static String getAlgoName(Class<? extends MLAlgo> algoClass) {
- Algorithm annotation = algoClass.getAnnotation(Algorithm.class);
- if (annotation != null) {
- return annotation.name();
- }
- throw new IllegalArgumentException("Algo should be decorated with annotation - " + Algorithm.class.getName());
- }
-
- public static MLServiceImpl getMLService() throws Exception {
- return getServiceProvider().getService(MLService.NAME);
- }
-
- public static ServiceProvider getServiceProvider() throws Exception {
- Class<? extends ServiceProviderFactory> spfClass = HIVE_CONF.getClass(LensConfConstants.SERVICE_PROVIDER_FACTORY,
- null, ServiceProviderFactory.class);
- ServiceProviderFactory spf = spfClass.newInstance();
- return spf.getServiceProvider();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
deleted file mode 100644
index 429cbf9..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/ModelLoader.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
-/**
- * Load ML models from a FS location.
- */
-public final class ModelLoader {
- private ModelLoader() {
- }
-
- /** The Constant MODEL_PATH_BASE_DIR. */
- public static final String MODEL_PATH_BASE_DIR = "lens.ml.model.basedir";
-
- /** The Constant MODEL_PATH_BASE_DIR_DEFAULT. */
- public static final String MODEL_PATH_BASE_DIR_DEFAULT = "file:///tmp";
-
- /** The Constant LOG. */
- public static final Log LOG = LogFactory.getLog(ModelLoader.class);
-
- /** The Constant TEST_REPORT_BASE_DIR. */
- public static final String TEST_REPORT_BASE_DIR = "lens.ml.test.basedir";
-
- /** The Constant TEST_REPORT_BASE_DIR_DEFAULT. */
- public static final String TEST_REPORT_BASE_DIR_DEFAULT = "file:///tmp/ml_reports";
-
- // Model cache settings
- /** The Constant MODEL_CACHE_SIZE. */
- public static final long MODEL_CACHE_SIZE = 10;
-
- /** The Constant MODEL_CACHE_TIMEOUT. */
- public static final long MODEL_CACHE_TIMEOUT = 3600000L; // one hour
-
- /** The model cache. */
- private static Cache<Path, MLModel> modelCache = CacheBuilder.newBuilder().maximumSize(MODEL_CACHE_SIZE)
- .expireAfterAccess(MODEL_CACHE_TIMEOUT, TimeUnit.MILLISECONDS).build();
-
- /**
- * Gets the model location.
- *
- * @param conf the conf
- * @param algorithm the algorithm
- * @param modelID the model id
- * @return the model location
- */
- public static Path getModelLocation(Configuration conf, String algorithm, String modelID) {
- String modelDataBaseDir = conf.get(MODEL_PATH_BASE_DIR, MODEL_PATH_BASE_DIR_DEFAULT);
- // Model location format - <modelDataBaseDir>/<algorithm>/modelID
- return new Path(new Path(new Path(modelDataBaseDir), algorithm), modelID);
- }
-
- /**
- * Load model.
- *
- * @param conf the conf
- * @param algorithm the algorithm
- * @param modelID the model id
- * @return the ML model
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static MLModel loadModel(Configuration conf, String algorithm, String modelID) throws IOException {
- final Path modelPath = getModelLocation(conf, algorithm, modelID);
- LOG.info("Loading model for algorithm: " + algorithm + " modelID: " + modelID + " At path: "
- + modelPath.toUri().toString());
- try {
- return modelCache.get(modelPath, new Callable<MLModel>() {
- @Override
- public MLModel call() throws Exception {
- FileSystem fs = modelPath.getFileSystem(new HiveConf());
- if (!fs.exists(modelPath)) {
- throw new IOException("Model path not found " + modelPath.toString());
- }
-
- ObjectInputStream ois = null;
- try {
- ois = new ObjectInputStream(fs.open(modelPath));
- MLModel model = (MLModel) ois.readObject();
- LOG.info("Loaded model " + model.getId() + " from location " + modelPath);
- return model;
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- } finally {
- IOUtils.closeQuietly(ois);
- }
- }
- });
- } catch (ExecutionException exc) {
- throw new IOException(exc);
- }
- }
-
- /**
- * Clear cache.
- */
- public static void clearCache() {
- modelCache.cleanUp();
- }
-
- /**
- * Gets the test report path.
- *
- * @param conf the conf
- * @param algorithm the algorithm
- * @param report the report
- * @return the test report path
- */
- public static Path getTestReportPath(Configuration conf, String algorithm, String report) {
- String testReportDir = conf.get(TEST_REPORT_BASE_DIR, TEST_REPORT_BASE_DIR_DEFAULT);
- return new Path(new Path(testReportDir, algorithm), report);
- }
-
- /**
- * Save test report.
- *
- * @param conf the conf
- * @param report the report
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static void saveTestReport(Configuration conf, MLTestReport report) throws IOException {
- Path reportDir = new Path(conf.get(TEST_REPORT_BASE_DIR, TEST_REPORT_BASE_DIR_DEFAULT));
- FileSystem fs = reportDir.getFileSystem(conf);
-
- if (!fs.exists(reportDir)) {
- LOG.info("Creating test report dir " + reportDir.toUri().toString());
- fs.mkdirs(reportDir);
- }
-
- Path algoDir = new Path(reportDir, report.getAlgorithm());
-
- if (!fs.exists(algoDir)) {
- LOG.info("Creating algorithm report dir " + algoDir.toUri().toString());
- fs.mkdirs(algoDir);
- }
-
- ObjectOutputStream reportOutputStream = null;
- Path reportSaveLocation;
- try {
- reportSaveLocation = new Path(algoDir, report.getReportID());
- reportOutputStream = new ObjectOutputStream(fs.create(reportSaveLocation));
- reportOutputStream.writeObject(report);
- reportOutputStream.flush();
- } catch (IOException ioexc) {
- LOG.error("Error saving test report " + report.getReportID(), ioexc);
- throw ioexc;
- } finally {
- IOUtils.closeQuietly(reportOutputStream);
- }
- LOG.info("Saved report " + report.getReportID() + " at location " + reportSaveLocation.toUri());
- }
-
- /**
- * Load report.
- *
- * @param conf the conf
- * @param algorithm the algorithm
- * @param reportID the report id
- * @return the ML test report
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static MLTestReport loadReport(Configuration conf, String algorithm, String reportID) throws IOException {
- Path reportLocation = getTestReportPath(conf, algorithm, reportID);
- FileSystem fs = reportLocation.getFileSystem(conf);
- ObjectInputStream reportStream = null;
- MLTestReport report = null;
-
- try {
- reportStream = new ObjectInputStream(fs.open(reportLocation));
- report = (MLTestReport) reportStream.readObject();
- } catch (IOException ioex) {
- LOG.error("Error reading report " + reportLocation, ioex);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- } finally {
- IOUtils.closeQuietly(reportStream);
- }
- return report;
- }
-
- /**
- * Delete model.
- *
- * @param conf the conf
- * @param algorithm the algorithm
- * @param modelID the model id
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static void deleteModel(HiveConf conf, String algorithm, String modelID) throws IOException {
- Path modelLocation = getModelLocation(conf, algorithm, modelID);
- FileSystem fs = modelLocation.getFileSystem(conf);
- fs.delete(modelLocation, false);
- }
-
- /**
- * Delete test report.
- *
- * @param conf the conf
- * @param algorithm the algorithm
- * @param reportID the report id
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static void deleteTestReport(HiveConf conf, String algorithm, String reportID) throws IOException {
- Path reportPath = getTestReportPath(conf, algorithm, reportID);
- reportPath.getFileSystem(conf).delete(reportPath, false);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/MultiPrediction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/MultiPrediction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/MultiPrediction.java
deleted file mode 100644
index 4794c97..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/MultiPrediction.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.util.List;
-
-/**
- * The Interface MultiPrediction.
- */
-public interface MultiPrediction {
- List<LabelledPrediction> getPredictions();
-}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/QueryRunner.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/QueryRunner.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/QueryRunner.java
deleted file mode 100644
index 56f9a88..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/QueryRunner.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import org.apache.lens.api.LensException;
-import org.apache.lens.api.LensSessionHandle;
-import org.apache.lens.api.query.QueryHandle;
-
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- * Run a query against a Lens server.
- */
-public abstract class QueryRunner {
-
- /** The session handle. */
- protected final LensSessionHandle sessionHandle;
-
- @Getter @Setter
- protected String queryName;
-
- /**
- * Instantiates a new query runner.
- *
- * @param sessionHandle the session handle
- */
- public QueryRunner(LensSessionHandle sessionHandle) {
- this.sessionHandle = sessionHandle;
- }
-
- /**
- * Run query.
- *
- * @param query the query
- * @return the query handle
- * @throws LensException the lens exception
- */
- public abstract QueryHandle runQuery(String query) throws LensException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/TableTestingSpec.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/TableTestingSpec.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/TableTestingSpec.java
deleted file mode 100644
index f7fb1f8..0000000
--- a/lens-ml-lib/src/main/java/org/apache/lens/ml/TableTestingSpec.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.ml;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-
-import lombok.Getter;
-
-/**
- * Table specification for running test on a table.
- */
-public class TableTestingSpec {
-
- /** The Constant LOG. */
- public static final Log LOG = LogFactory.getLog(TableTestingSpec.class);
-
- /** The db. */
- private String db;
-
- /** The table containing input data. */
- private String inputTable;
-
- // TODO use partition condition
- /** The partition filter. */
- private String partitionFilter;
-
- /** The feature columns. */
- private List<String> featureColumns;
-
- /** The label column. */
- private String labelColumn;
-
- /** The output column. */
- private String outputColumn;
-
- /** The output table. */
- private String outputTable;
-
- /** The conf. */
- private transient HiveConf conf;
-
- /** The algorithm. */
- private String algorithm;
-
- /** The model id. */
- private String modelID;
-
- @Getter
- private boolean outputTableExists;
-
- @Getter
- private String testID;
-
- private HashMap<String, FieldSchema> columnNameToFieldSchema;
-
- /**
- * The Class TableTestingSpecBuilder.
- */
- public static class TableTestingSpecBuilder {
-
- /** The spec. */
- private final TableTestingSpec spec;
-
- /**
- * Instantiates a new table testing spec builder.
- */
- public TableTestingSpecBuilder() {
- spec = new TableTestingSpec();
- }
-
- /**
- * Database.
- *
- * @param database the database
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder database(String database) {
- spec.db = database;
- return this;
- }
-
- /**
- * Set the input table
- *
- * @param table the table
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder inputTable(String table) {
- spec.inputTable = table;
- return this;
- }
-
- /**
- * Partition filter for input table
- *
- * @param partFilter the part filter
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder partitionFilter(String partFilter) {
- spec.partitionFilter = partFilter;
- return this;
- }
-
- /**
- * Feature columns.
- *
- * @param featureColumns the feature columns
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder featureColumns(List<String> featureColumns) {
- spec.featureColumns = featureColumns;
- return this;
- }
-
- /**
- * Labe column.
- *
- * @param labelColumn the label column
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder lableColumn(String labelColumn) {
- spec.labelColumn = labelColumn;
- return this;
- }
-
- /**
- * Output column.
- *
- * @param outputColumn the output column
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder outputColumn(String outputColumn) {
- spec.outputColumn = outputColumn;
- return this;
- }
-
- /**
- * Output table.
- *
- * @param table the table
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder outputTable(String table) {
- spec.outputTable = table;
- return this;
- }
-
- /**
- * Hive conf.
- *
- * @param conf the conf
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder hiveConf(HiveConf conf) {
- spec.conf = conf;
- return this;
- }
-
- /**
- * Algorithm.
- *
- * @param algorithm the algorithm
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder algorithm(String algorithm) {
- spec.algorithm = algorithm;
- return this;
- }
-
- /**
- * Model id.
- *
- * @param modelID the model id
- * @return the table testing spec builder
- */
- public TableTestingSpecBuilder modelID(String modelID) {
- spec.modelID = modelID;
- return this;
- }
-
- /**
- * Builds the.
- *
- * @return the table testing spec
- */
- public TableTestingSpec build() {
- return spec;
- }
-
- /**
- * Set the unique test id
- *
- * @param testID
- * @return
- */
- public TableTestingSpecBuilder testID(String testID) {
- spec.testID = testID;
- return this;
- }
- }
-
- /**
- * New builder.
- *
- * @return the table testing spec builder
- */
- public static TableTestingSpecBuilder newBuilder() {
- return new TableTestingSpecBuilder();
- }
-
- /**
- * Validate.
- *
- * @return true, if successful
- */
- public boolean validate() {
- List<FieldSchema> columns;
- try {
- Hive metastoreClient = Hive.get(conf);
- Table tbl = (db == null) ? metastoreClient.getTable(inputTable) : metastoreClient.getTable(db, inputTable);
- columns = tbl.getAllCols();
- columnNameToFieldSchema = new HashMap<String, FieldSchema>();
-
- for (FieldSchema fieldSchema : columns) {
- columnNameToFieldSchema.put(fieldSchema.getName(), fieldSchema);
- }
-
- // Check if output table exists
- Table outTbl = metastoreClient.getTable(db == null ? "default" : db, outputTable, false);
- outputTableExists = (outTbl != null);
- } catch (HiveException exc) {
- LOG.error("Error getting table info " + toString(), exc);
- return false;
- }
-
- // Check if labeled column and feature columns are contained in the table
- List<String> testTableColumns = new ArrayList<String>(columns.size());
- for (FieldSchema column : columns) {
- testTableColumns.add(column.getName());
- }
-
- if (!testTableColumns.containsAll(featureColumns)) {
- LOG.info("Invalid feature columns: " + featureColumns + ". Actual columns in table:" + testTableColumns);
- return false;
- }
-
- if (!testTableColumns.contains(labelColumn)) {
- LOG.info("Invalid label column: " + labelColumn + ". Actual columns in table:" + testTableColumns);
- return false;
- }
-
- if (StringUtils.isBlank(outputColumn)) {
- LOG.info("Output column is required");
- return false;
- }
-
- if (StringUtils.isBlank(outputTable)) {
- LOG.info("Output table is required");
- return false;
- }
- return true;
- }
-
- public String getTestQuery() {
- if (!validate()) {
- return null;
- }
-
- // We always insert a dynamic partition
- StringBuilder q = new StringBuilder("INSERT OVERWRITE TABLE " + outputTable + " PARTITION (part_testid='" + testID
- + "') SELECT ");
- String featureCols = StringUtils.join(featureColumns, ",");
- q.append(featureCols).append(",").append(labelColumn).append(", ").append("predict(").append("'").append(algorithm)
- .append("', ").append("'").append(modelID).append("', ").append(featureCols).append(") ").append(outputColumn)
- .append(" FROM ").append(inputTable);
-
- return q.toString();
- }
-
- public String getCreateOutputTableQuery() {
- StringBuilder createTableQuery = new StringBuilder("CREATE TABLE IF NOT EXISTS ").append(outputTable).append("(");
- // Output table contains feature columns, label column, output column
- List<String> outputTableColumns = new ArrayList<String>();
- for (String featureCol : featureColumns) {
- outputTableColumns.add(featureCol + " " + columnNameToFieldSchema.get(featureCol).getType());
- }
-
- outputTableColumns.add(labelColumn + " " + columnNameToFieldSchema.get(labelColumn).getType());
- outputTableColumns.add(outputColumn + " string");
-
- createTableQuery.append(StringUtils.join(outputTableColumns, ", "));
-
- // Append partition column
- createTableQuery.append(") PARTITIONED BY (part_testid string)");
-
- return createTableQuery.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/AlgoParam.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/AlgoParam.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/AlgoParam.java
new file mode 100644
index 0000000..e0d13c0
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/AlgoParam.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The Interface AlgoParam.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface AlgoParam {
+
+ /**
+ * Name.
+ *
+ * @return the string
+ */
+ String name();
+
+ /**
+ * Help.
+ *
+ * @return the string
+ */
+ String help();
+
+ /**
+ * Default value.
+ *
+ * @return the string
+ */
+ String defaultValue() default "None";
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/Algorithm.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/Algorithm.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/Algorithm.java
new file mode 100644
index 0000000..29bde29
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/Algorithm.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The Interface Algorithm.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface Algorithm {
+
+ /**
+ * Name.
+ *
+ * @return the string
+ */
+ String name();
+
+ /**
+ * Description.
+ *
+ * @return the string
+ */
+ String description();
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLAlgo.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLAlgo.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLAlgo.java
new file mode 100644
index 0000000..44b0043
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLAlgo.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensException;
+
+/**
+ * The Interface MLAlgo.
+ */
+public interface MLAlgo {
+ String getName();
+
+ String getDescription();
+
+ /**
+ * Configure.
+ *
+ * @param configuration the configuration
+ */
+ void configure(LensConf configuration);
+
+ LensConf getConf();
+
+ /**
+ * Train.
+ *
+ * @param conf the conf
+ * @param db the db
+ * @param table the table
+ * @param modelId the model id
+ * @param params the params
+ * @return the ML model
+ * @throws LensException the lens exception
+ */
+ MLModel train(LensConf conf, String db, String table, String modelId, String... params) throws LensException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLDriver.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLDriver.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLDriver.java
new file mode 100644
index 0000000..1aa699d
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLDriver.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import java.util.List;
+
+import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensException;
+
+/**
+ * The Interface MLDriver.
+ */
+public interface MLDriver {
+
+ /**
+ * Checks if is algo supported.
+ *
+ * @param algo the algo
+ * @return true, if is algo supported
+ */
+ boolean isAlgoSupported(String algo);
+
+ /**
+ * Gets the algo instance.
+ *
+ * @param algo the algo
+ * @return the algo instance
+ * @throws LensException the lens exception
+ */
+ MLAlgo getAlgoInstance(String algo) throws LensException;
+
+ /**
+ * Inits the.
+ *
+ * @param conf the conf
+ * @throws LensException the lens exception
+ */
+ void init(LensConf conf) throws LensException;
+
+ /**
+ * Start.
+ *
+ * @throws LensException the lens exception
+ */
+ void start() throws LensException;
+
+ /**
+ * Stop.
+ *
+ * @throws LensException the lens exception
+ */
+ void stop() throws LensException;
+
+ List<String> getAlgoNames();
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLModel.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLModel.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLModel.java
new file mode 100644
index 0000000..73717ac
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/api/MLModel.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.api;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.List;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * Instantiates a new ML model.
+ */
+@NoArgsConstructor
+@ToString
+public abstract class MLModel<PREDICTION> implements Serializable {
+
+ /** The id. */
+ @Getter
+ @Setter
+ private String id;
+
+ /** The created at. */
+ @Getter
+ @Setter
+ private Date createdAt;
+
+ /** The algo name. */
+ @Getter
+ @Setter
+ private String algoName;
+
+ /** The table. */
+ @Getter
+ @Setter
+ private String table;
+
+ /** The params. */
+ @Getter
+ @Setter
+ private List<String> params;
+
+ /** The label column. */
+ @Getter
+ @Setter
+ private String labelColumn;
+
+ /** The feature columns. */
+ @Getter
+ @Setter
+ private List<String> featureColumns;
+
+ /**
+ * Predict.
+ *
+ * @param args the args
+ * @return the prediction
+ */
+ public abstract PREDICTION predict(Object... args);
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/AlgoArgParser.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/AlgoArgParser.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/AlgoArgParser.java
new file mode 100644
index 0000000..51979d8
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/AlgoArgParser.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lens.ml.algo.api.AlgoParam;
+import org.apache.lens.ml.algo.api.MLAlgo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The Class AlgoArgParser.
+ */
+public final class AlgoArgParser {
+ private AlgoArgParser() {
+ }
+
+ /**
+ * The Class CustomArgParser.
+ *
+ * @param <E> the element type
+ */
+ public abstract static class CustomArgParser<E> {
+
+ /**
+ * Parses the.
+ *
+ * @param value the value
+ * @return the e
+ */
+ public abstract E parse(String value);
+ }
+
+ /** The Constant LOG. */
+ public static final Log LOG = LogFactory.getLog(AlgoArgParser.class);
+
+ /**
+ * Extracts feature names. If the algo has any parameters associated with @AlgoParam annotation, those are set
+ * as well.
+ *
+ * @param algo the algo
+ * @param args the args
+ * @return List of feature column names.
+ */
+ public static List<String> parseArgs(MLAlgo algo, String[] args) {
+ List<String> featureColumns = new ArrayList<String>();
+ Class<? extends MLAlgo> algoClass = algo.getClass();
+ // Get param fields
+ Map<String, Field> fieldMap = new HashMap<String, Field>();
+
+ for (Field fld : algoClass.getDeclaredFields()) {
+ fld.setAccessible(true);
+ AlgoParam paramAnnotation = fld.getAnnotation(AlgoParam.class);
+ if (paramAnnotation != null) {
+ fieldMap.put(paramAnnotation.name(), fld);
+ }
+ }
+
+ for (int i = 0; i < args.length; i += 2) {
+ String key = args[i].trim();
+ String value = args[i + 1].trim();
+
+ try {
+ if ("feature".equalsIgnoreCase(key)) {
+ featureColumns.add(value);
+ } else if (fieldMap.containsKey(key)) {
+ Field f = fieldMap.get(key);
+ if (String.class.equals(f.getType())) {
+ f.set(algo, value);
+ } else if (Integer.TYPE.equals(f.getType())) {
+ f.setInt(algo, Integer.parseInt(value));
+ } else if (Double.TYPE.equals(f.getType())) {
+ f.setDouble(algo, Double.parseDouble(value));
+ } else if (Long.TYPE.equals(f.getType())) {
+ f.setLong(algo, Long.parseLong(value));
+ } else {
+ // check if the algo provides a deserializer for this param
+ String customParserClass = algo.getConf().getProperties().get("lens.ml.args." + key);
+ if (customParserClass != null) {
+ Class<? extends CustomArgParser<?>> clz = (Class<? extends CustomArgParser<?>>) Class
+ .forName(customParserClass);
+ CustomArgParser<?> parser = clz.newInstance();
+ f.set(algo, parser.parse(value));
+ } else {
+ LOG.warn("Ignored param " + key + "=" + value + " as no parser found");
+ }
+ }
+ }
+ } catch (Exception exc) {
+ LOG.error("Error while setting param " + key + " to " + value + " for algo " + algo);
+ }
+ }
+ return featureColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/Algorithms.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/Algorithms.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/Algorithms.java
new file mode 100644
index 0000000..a2fd94b
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/Algorithms.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lens.api.LensException;
+import org.apache.lens.ml.algo.api.Algorithm;
+import org.apache.lens.ml.algo.api.MLAlgo;
+
+/**
+ * The Class Algorithms.
+ */
+public class Algorithms {
+
+ /** The algorithm classes. */
+ private final Map<String, Class<? extends MLAlgo>> algorithmClasses
+ = new HashMap<String, Class<? extends MLAlgo>>();
+
+ /**
+ * Register.
+ *
+ * @param algoClass the algo class
+ */
+ public void register(Class<? extends MLAlgo> algoClass) {
+ if (algoClass != null && algoClass.getAnnotation(Algorithm.class) != null) {
+ algorithmClasses.put(algoClass.getAnnotation(Algorithm.class).name(), algoClass);
+ } else {
+ throw new IllegalArgumentException("Not a valid algorithm class: " + algoClass);
+ }
+ }
+
+ /**
+ * Gets the algo for name.
+ *
+ * @param name the name
+ * @return the algo for name
+ * @throws LensException the lens exception
+ */
+ public MLAlgo getAlgoForName(String name) throws LensException {
+ Class<? extends MLAlgo> algoClass = algorithmClasses.get(name);
+ if (algoClass == null) {
+ return null;
+ }
+ Algorithm algoAnnotation = algoClass.getAnnotation(Algorithm.class);
+ String description = algoAnnotation.description();
+ try {
+ Constructor<? extends MLAlgo> algoConstructor = algoClass.getConstructor(String.class, String.class);
+ return algoConstructor.newInstance(name, description);
+ } catch (Exception exc) {
+ throw new LensException("Unable to get algo: " + name, exc);
+ }
+ }
+
+ /**
+ * Checks if is algo supported.
+ *
+ * @param name the name
+ * @return true, if is algo supported
+ */
+ public boolean isAlgoSupported(String name) {
+ return algorithmClasses.containsKey(name);
+ }
+
+ public List<String> getAlgorithmNames() {
+ return new ArrayList<String>(algorithmClasses.keySet());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ClassifierBaseModel.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ClassifierBaseModel.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ClassifierBaseModel.java
new file mode 100644
index 0000000..a960a4a
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ClassifierBaseModel.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import org.apache.lens.ml.algo.api.MLModel;
+
+/**
+ * Return a single double value as a prediction. This is useful in classifiers where the classifier returns a single
+ * class label as a prediction.
+ */
+public abstract class ClassifierBaseModel extends MLModel<Double> {
+
+ /**
+ * Gets the feature vector.
+ *
+ * @param args the args
+ * @return the feature vector
+ */
+ public final double[] getFeatureVector(Object[] args) {
+ double[] features = new double[args.length];
+ for (int i = 0; i < args.length; i++) {
+ if (args[i] instanceof Double) {
+ features[i] = (Double) args[i];
+ } else if (args[i] instanceof String) {
+ features[i] = Double.parseDouble((String) args[i]);
+ } else {
+ features[i] = 0.0;
+ }
+ }
+ return features;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ForecastingModel.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ForecastingModel.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ForecastingModel.java
new file mode 100644
index 0000000..16a6180
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/ForecastingModel.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import java.util.List;
+
+import org.apache.lens.ml.algo.api.MLModel;
+
+/**
+ * The Class ForecastingModel.
+ */
+public class ForecastingModel extends MLModel<MultiPrediction> {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.lens.ml.MLModel#predict(java.lang.Object[])
+ */
+ @Override
+ public MultiPrediction predict(Object... args) {
+ return new ForecastingPredictions(null);
+ }
+
+ /**
+ * The Class ForecastingPredictions.
+ */
+ public static class ForecastingPredictions implements MultiPrediction {
+
+ /** The values. */
+ private final List<LabelledPrediction> values;
+
+ /**
+ * Instantiates a new forecasting predictions.
+ *
+ * @param values the values
+ */
+ public ForecastingPredictions(List<LabelledPrediction> values) {
+ this.values = values;
+ }
+
+ @Override
+ public List<LabelledPrediction> getPredictions() {
+ return values;
+ }
+ }
+
+ /**
+ * The Class ForecastingLabel.
+ */
+ public static class ForecastingLabel implements LabelledPrediction<Long, Double> {
+
+ /** The timestamp. */
+ private final Long timestamp;
+
+ /** The value. */
+ private final double value;
+
+ /**
+ * Instantiates a new forecasting label.
+ *
+ * @param timestamp the timestamp
+ * @param value the value
+ */
+ public ForecastingLabel(long timestamp, double value) {
+ this.timestamp = timestamp;
+ this.value = value;
+ }
+
+ @Override
+ public Long getLabel() {
+ return timestamp;
+ }
+
+ @Override
+ public Double getPrediction() {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/LabelledPrediction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/LabelledPrediction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/LabelledPrediction.java
new file mode 100644
index 0000000..9c7737e
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/LabelledPrediction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+/**
+ * Prediction type used when the model prediction is of complex types. For example, in forecasting the predictions are a
+ * series of timestamp, and value pairs.
+ *
+ * @param <LABELTYPE> the generic type
+ * @param <PREDICTIONTYPE> the generic type
+ */
+public interface LabelledPrediction<LABELTYPE, PREDICTIONTYPE> {
+ LABELTYPE getLabel();
+
+ PREDICTIONTYPE getPrediction();
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/MultiPrediction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/MultiPrediction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/MultiPrediction.java
new file mode 100644
index 0000000..e910a92
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/lib/MultiPrediction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.lib;
+
+import java.util.List;
+
+/**
+ * The Interface MultiPrediction.
+ */
+public interface MultiPrediction {
+ List<LabelledPrediction> getPredictions();
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkAlgo.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkAlgo.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkAlgo.java
new file mode 100644
index 0000000..4012085
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkAlgo.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensException;
+import org.apache.lens.ml.algo.api.AlgoParam;
+import org.apache.lens.ml.algo.api.Algorithm;
+import org.apache.lens.ml.algo.api.MLAlgo;
+import org.apache.lens.ml.algo.api.MLModel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.regression.LabeledPoint;
+import org.apache.spark.rdd.RDD;
+
+/**
+ * The Class BaseSparkAlgo.
+ */
+public abstract class BaseSparkAlgo implements MLAlgo {
+
+ /** The Constant LOG. */
+ public static final Log LOG = LogFactory.getLog(BaseSparkAlgo.class);
+
+ /** The name. */
+ private final String name;
+
+ /** The description. */
+ private final String description;
+
+ /** The spark context. */
+ protected JavaSparkContext sparkContext;
+
+ /** The params. */
+ protected Map<String, String> params;
+
+ /** The conf. */
+ protected transient LensConf conf;
+
+ /** The training fraction. */
+ @AlgoParam(name = "trainingFraction", help = "% of dataset to be used for training", defaultValue = "0")
+ protected double trainingFraction;
+
+ /** The use training fraction. */
+ private boolean useTrainingFraction;
+
+ /** The label. */
+ @AlgoParam(name = "label", help = "Name of column which is used as a training label for supervised learning")
+ protected String label;
+
+ /** The partition filter. */
+ @AlgoParam(name = "partition", help = "Partition filter used to create create HCatInputFormats")
+ protected String partitionFilter;
+
+ /** The features. */
+ @AlgoParam(name = "feature", help = "Column name(s) which are to be used as sample features")
+ protected List<String> features;
+
+ /**
+ * Instantiates a new base spark algo.
+ *
+ * @param name the name
+ * @param description the description
+ */
+ public BaseSparkAlgo(String name, String description) {
+ this.name = name;
+ this.description = description;
+ }
+
+ public void setSparkContext(JavaSparkContext sparkContext) {
+ this.sparkContext = sparkContext;
+ }
+
+ @Override
+ public LensConf getConf() {
+ return conf;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.lens.ml.MLAlgo#configure(org.apache.lens.api.LensConf)
+ */
+ @Override
+ public void configure(LensConf configuration) {
+ this.conf = configuration;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.lens.ml.MLAlgo#train(org.apache.lens.api.LensConf, java.lang.String, java.lang.String,
+ * java.lang.String, java.lang.String[])
+ */
+ @Override
+ public MLModel<?> train(LensConf conf, String db, String table, String modelId, String... params)
+ throws LensException {
+ parseParams(params);
+
+ TableTrainingSpec.TableTrainingSpecBuilder builder = TableTrainingSpec.newBuilder().hiveConf(toHiveConf(conf))
+ .database(db).table(table).partitionFilter(partitionFilter).featureColumns(features).labelColumn(label);
+
+ if (useTrainingFraction) {
+ builder.trainingFraction(trainingFraction);
+ }
+
+ TableTrainingSpec spec = builder.build();
+ LOG.info("Training " + " with " + features.size() + " features");
+
+ spec.createRDDs(sparkContext);
+
+ RDD<LabeledPoint> trainingRDD = spec.getTrainingRDD();
+ BaseSparkClassificationModel<?> model = trainInternal(modelId, trainingRDD);
+ model.setTable(table);
+ model.setParams(Arrays.asList(params));
+ model.setLabelColumn(label);
+ model.setFeatureColumns(features);
+ return model;
+ }
+
+ /**
+ * To hive conf.
+ *
+ * @param conf the conf
+ * @return the hive conf
+ */
+ protected HiveConf toHiveConf(LensConf conf) {
+ HiveConf hiveConf = new HiveConf();
+ for (String key : conf.getProperties().keySet()) {
+ hiveConf.set(key, conf.getProperties().get(key));
+ }
+ return hiveConf;
+ }
+
+ /**
+ * Parses the params.
+ *
+ * @param args the args
+ */
+ public void parseParams(String[] args) {
+ if (args.length % 2 != 0) {
+ throw new IllegalArgumentException("Invalid number of params " + args.length);
+ }
+
+ params = new LinkedHashMap<String, String>();
+
+ for (int i = 0; i < args.length; i += 2) {
+ if ("f".equalsIgnoreCase(args[i]) || "feature".equalsIgnoreCase(args[i])) {
+ if (features == null) {
+ features = new ArrayList<String>();
+ }
+ features.add(args[i + 1]);
+ } else if ("l".equalsIgnoreCase(args[i]) || "label".equalsIgnoreCase(args[i])) {
+ label = args[i + 1];
+ } else {
+ params.put(args[i].replaceAll("\\-+", ""), args[i + 1]);
+ }
+ }
+
+ if (params.containsKey("trainingFraction")) {
+ // Get training Fraction
+ String trainingFractionStr = params.get("trainingFraction");
+ try {
+ trainingFraction = Double.parseDouble(trainingFractionStr);
+ useTrainingFraction = true;
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("Invalid training fraction", nfe);
+ }
+ }
+
+ if (params.containsKey("partition") || params.containsKey("p")) {
+ partitionFilter = params.containsKey("partition") ? params.get("partition") : params.get("p");
+ }
+
+ parseAlgoParams(params);
+ }
+
+ /**
+ * Gets the param value.
+ *
+ * @param param the param
+ * @param defaultVal the default val
+ * @return the param value
+ */
+ public double getParamValue(String param, double defaultVal) {
+ if (params.containsKey(param)) {
+ try {
+ return Double.parseDouble(params.get(param));
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Couldn't parse param value: " + param + " as double.");
+ }
+ }
+ return defaultVal;
+ }
+
+ /**
+ * Gets the param value.
+ *
+ * @param param the param
+ * @param defaultVal the default val
+ * @return the param value
+ */
+ public int getParamValue(String param, int defaultVal) {
+ if (params.containsKey(param)) {
+ try {
+ return Integer.parseInt(params.get(param));
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Couldn't parse param value: " + param + " as integer.");
+ }
+ }
+ return defaultVal;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public Map<String, String> getArgUsage() {
+ Map<String, String> usage = new LinkedHashMap<String, String>();
+ Class<?> clz = this.getClass();
+ // Put class name and description as well as part of the usage
+ Algorithm algorithm = clz.getAnnotation(Algorithm.class);
+ if (algorithm != null) {
+ usage.put("Algorithm Name", algorithm.name());
+ usage.put("Algorithm Description", algorithm.description());
+ }
+
+ // Get all algo params including base algo params
+ while (clz != null) {
+ for (Field field : clz.getDeclaredFields()) {
+ AlgoParam param = field.getAnnotation(AlgoParam.class);
+ if (param != null) {
+ usage.put("[param] " + param.name(), param.help() + " Default Value = " + param.defaultValue());
+ }
+ }
+
+ if (clz.equals(BaseSparkAlgo.class)) {
+ break;
+ }
+ clz = clz.getSuperclass();
+ }
+ return usage;
+ }
+
+ /**
+ * Parses the algo params.
+ *
+ * @param params the params
+ */
+ public abstract void parseAlgoParams(Map<String, String> params);
+
+ /**
+ * Train internal.
+ *
+ * @param modelId the model id
+ * @param trainingRDD the training rdd
+ * @return the base spark classification model
+ * @throws LensException the lens exception
+ */
+ protected abstract BaseSparkClassificationModel trainInternal(String modelId, RDD<LabeledPoint> trainingRDD)
+ throws LensException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkClassificationModel.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkClassificationModel.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkClassificationModel.java
new file mode 100644
index 0000000..806dc1f
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/BaseSparkClassificationModel.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import org.apache.lens.ml.algo.lib.ClassifierBaseModel;
+
+import org.apache.spark.mllib.classification.ClassificationModel;
+import org.apache.spark.mllib.linalg.Vectors;
+
+/**
+ * The Class BaseSparkClassificationModel.
+ *
+ * @param <MODEL> the generic type
+ */
+public class BaseSparkClassificationModel<MODEL extends ClassificationModel> extends ClassifierBaseModel {
+
+ /** The model id. */
+ private final String modelId;
+
+ /** The spark model. */
+ private final MODEL sparkModel;
+
+ /**
+ * Instantiates a new base spark classification model.
+ *
+ * @param modelId the model id
+ * @param model the model
+ */
+ public BaseSparkClassificationModel(String modelId, MODEL model) {
+ this.modelId = modelId;
+ this.sparkModel = model;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.lens.ml.MLModel#predict(java.lang.Object[])
+ */
+ @Override
+ public Double predict(Object... args) {
+ return sparkModel.predict(Vectors.dense(getFeatureVector(args)));
+ }
+
+ @Override
+ public String getId() {
+ return modelId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/ColumnFeatureFunction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/ColumnFeatureFunction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/ColumnFeatureFunction.java
new file mode 100644
index 0000000..d75efc0
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/ColumnFeatureFunction.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.log4j.Logger;
+import org.apache.spark.mllib.linalg.Vectors;
+import org.apache.spark.mllib.regression.LabeledPoint;
+
+import com.google.common.base.Preconditions;
+import scala.Tuple2;
+
+/**
+ * A feature function that directly maps an HCatRecord to a feature vector. Each column becomes a feature in the vector,
+ * with the value of the feature obtained using the value mapper for that column
+ */
+public class ColumnFeatureFunction extends FeatureFunction {
+
+ /** The Constant LOG. */
+ public static final Logger LOG = Logger.getLogger(ColumnFeatureFunction.class);
+
+ /** The feature value mappers. */
+ private final FeatureValueMapper[] featureValueMappers;
+
+ /** The feature positions. */
+ private final int[] featurePositions;
+
+ /** The label column pos. */
+ private final int labelColumnPos;
+
+ /** The num features. */
+ private final int numFeatures;
+
+ /** The default labeled point. */
+ private final LabeledPoint defaultLabeledPoint;
+
+ /**
+ * Feature positions and value mappers are parallel arrays. featurePositions[i] gives the position of ith feature in
+ * the HCatRecord, and valueMappers[i] gives the value mapper used to map that feature to a Double value
+ *
+ * @param featurePositions position number of feature column in the HCatRecord
+ * @param valueMappers mapper for each column position
+ * @param labelColumnPos position of the label column
+ * @param numFeatures number of features in the feature vector
+ * @param defaultLabel default lable to be used for null records
+ */
+ public ColumnFeatureFunction(int[] featurePositions, FeatureValueMapper[] valueMappers, int labelColumnPos,
+ int numFeatures, double defaultLabel) {
+ Preconditions.checkNotNull(valueMappers, "Value mappers argument is required");
+ Preconditions.checkNotNull(featurePositions, "Feature positions are required");
+ Preconditions.checkArgument(valueMappers.length == featurePositions.length,
+ "Mismatch between number of value mappers and feature positions");
+
+ this.featurePositions = featurePositions;
+ this.featureValueMappers = valueMappers;
+ this.labelColumnPos = labelColumnPos;
+ this.numFeatures = numFeatures;
+ defaultLabeledPoint = new LabeledPoint(defaultLabel, Vectors.dense(new double[numFeatures]));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.lens.ml.spark.FeatureFunction#call(scala.Tuple2)
+ */
+ @Override
+ public LabeledPoint call(Tuple2<WritableComparable, HCatRecord> tuple) throws Exception {
+ HCatRecord record = tuple._2();
+
+ if (record == null) {
+ LOG.info("@@@ Null record");
+ return defaultLabeledPoint;
+ }
+
+ double[] features = new double[numFeatures];
+
+ for (int i = 0; i < numFeatures; i++) {
+ int featurePos = featurePositions[i];
+ features[i] = featureValueMappers[i].call(record.get(featurePos));
+ }
+
+ double label = featureValueMappers[labelColumnPos].call(record.get(labelColumnPos));
+ return new LabeledPoint(label, Vectors.dense(features));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/DoubleValueMapper.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/DoubleValueMapper.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/DoubleValueMapper.java
new file mode 100644
index 0000000..15ba9ea
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/DoubleValueMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+/**
+ * Directly return input when it is known to be double.
+ */
+public class DoubleValueMapper extends FeatureValueMapper {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.lens.ml.spark.FeatureValueMapper#call(java.lang.Object)
+ */
+ @Override
+ public final Double call(Object input) {
+ if (input instanceof Double || input == null) {
+ return input == null ? Double.valueOf(0d) : (Double) input;
+ }
+
+ throw new IllegalArgumentException("Invalid input expecting only doubles, but got " + input);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureFunction.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureFunction.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureFunction.java
new file mode 100644
index 0000000..5e2ab49
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.mllib.regression.LabeledPoint;
+
+import scala.Tuple2;
+
+/**
+ * Function to map an HCatRecord to a feature vector usable by MLLib.
+ */
+public abstract class FeatureFunction implements Function<Tuple2<WritableComparable, HCatRecord>, LabeledPoint> {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.spark.api.java.function.Function#call(java.lang.Object)
+ */
+ @Override
+ public abstract LabeledPoint call(Tuple2<WritableComparable, HCatRecord> tuple) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureValueMapper.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureValueMapper.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureValueMapper.java
new file mode 100644
index 0000000..28c8787
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/FeatureValueMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import java.io.Serializable;
+
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * Map a feature value to a Double value usable by MLLib.
+ */
+public abstract class FeatureValueMapper implements Function<Object, Double>, Serializable {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.spark.api.java.function.Function#call(java.lang.Object)
+ */
+ public abstract Double call(Object input);
+}
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/0f5ea4c7/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/HiveTableRDD.java
----------------------------------------------------------------------
diff --git a/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/HiveTableRDD.java b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/HiveTableRDD.java
new file mode 100644
index 0000000..4960e1e
--- /dev/null
+++ b/lens-ml-lib/src/main/java/org/apache/lens/ml/algo/spark/HiveTableRDD.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.ml.algo.spark;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Create a JavaRDD based on a Hive table using HCatInputFormat.
+ */
+public final class HiveTableRDD {
+ private HiveTableRDD() {
+ }
+
+ public static final Log LOG = LogFactory.getLog(HiveTableRDD.class);
+
+ /**
+ * Creates the hive table rdd.
+ *
+ * @param javaSparkContext the java spark context
+ * @param conf the conf
+ * @param db the db
+ * @param table the table
+ * @param partitionFilter the partition filter
+ * @return the java pair rdd
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public static JavaPairRDD<WritableComparable, HCatRecord> createHiveTableRDD(JavaSparkContext javaSparkContext,
+ Configuration conf, String db, String table, String partitionFilter) throws IOException {
+
+ HCatInputFormat.setInput(conf, db, table, partitionFilter);
+
+ JavaPairRDD<WritableComparable, HCatRecord> rdd = javaSparkContext.newAPIHadoopRDD(conf,
+ HCatInputFormat.class, // Input
+ WritableComparable.class, // input key class
+ HCatRecord.class); // input value class
+ return rdd;
+ }
+}