You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ch...@apache.org on 2019/01/21 15:28:06 UTC
[ignite] branch master updated: IGNITE-10968: [ML] Create new
ignite module SparkMLModelImport and add LogRegression converter
This is an automated email from the ASF dual-hosted git repository.
chief pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3547987 IGNITE-10968: [ML] Create new ignite module SparkMLModelImport and add LogRegression converter
3547987 is described below
commit 35479875c4ca9905492e3e910fc46f0179c5c907
Author: zaleslaw <za...@gmail.com>
AuthorDate: Mon Jan 21 18:28:00 2019 +0300
IGNITE-10968: [ML] Create new ignite module SparkMLModelImport and
add LogRegression converter
This closes #5851
---
examples/pom.xml | 11 +-
...LogRegFromSparkThroughSerializationExample.java | 182 ---------------------
.../LogRegFromSparkThroughPMMLExample.java | 2 +-
.../LogRegFromSparkViaSparkModelParserExample.java | 85 ++++++++++
.../examples/ml/inference/spark/package-info.java | 22 +++
modules/ml/pom.xml | 15 +-
modules/ml/spark-model-parser/pom.xml | 58 +++++++
.../ml/sparkmodelparser/SparkModelParser.java | 132 +++++++++++++++
.../ml/sparkmodelparser/SupportedSparkModels.java | 27 +++
.../UnsupportedSparkModelException.java | 35 ++++
.../ignite/ml/sparkmodelparser/package-info.java | 22 +++
pom.xml | 1 +
12 files changed, 400 insertions(+), 192 deletions(-)
diff --git a/examples/pom.xml b/examples/pom.xml
index 285a0d9..3563d74 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -90,6 +90,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-ml-spark-model-parser</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
@@ -150,12 +156,13 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
- <version>1.9.0</version>
+ <version>1.10.0</version>
</dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <version>2.7.0</version>
+ <version>2.9.1</version>
</dependency>
</dependencies>
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/LogRegFromSparkThroughSerializationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/LogRegFromSparkThroughSerializationExample.java
deleted file mode 100644
index 15917b6..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/ml/inference/LogRegFromSparkThroughSerializationExample.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.ml.inference;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.examples.ml.tutorial.TitanicUtils;
-import org.apache.ignite.ml.math.functions.IgniteBiFunction;
-import org.apache.ignite.ml.math.primitives.vector.Vector;
-import org.apache.ignite.ml.math.primitives.vector.VectorUtils;
-import org.apache.ignite.ml.math.primitives.vector.impl.DenseVector;
-import org.apache.ignite.ml.regressions.logistic.LogisticRegressionModel;
-import org.apache.ignite.ml.selection.scoring.evaluator.BinaryClassificationEvaluator;
-import org.apache.ignite.ml.selection.scoring.metric.Accuracy;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.example.data.simple.SimpleGroup;
-import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.io.ColumnIOFactory;
-import org.apache.parquet.io.MessageColumnIO;
-import org.apache.parquet.io.RecordReader;
-import org.apache.parquet.schema.MessageType;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Run logistic regression model loaded from snappy.parquet file.
- * The snappy.parquet file was generated by Spark MLLib model.write.overwrite().save(..) operator.
- * <p>
- * You can change the test data used in this example and re-run it to explore this algorithm further.</p>
- */
-public class LogRegFromSparkThroughSerializationExample {
- /** Run example. */
- public static void main(String[] args) throws FileNotFoundException {
- System.out.println();
- System.out.println(">>> Logistic regression model loaded from Spark through serialization over partitioned dataset usage example started.");
- // Start ignite grid.
- try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
- System.out.println(">>> Ignite grid started.");
-
- IgniteCache<Integer, Object[]> dataCache = TitanicUtils.readPassengers(ignite);
-
- IgniteBiFunction<Integer, Object[], Vector> featureExtractor = (k, v) -> {
- double[] data = new double[] {(double)v[0], (double)v[5], (double)v[6]};
- data[0] = Double.isNaN(data[0]) ? 0 : data[0];
- data[1] = Double.isNaN(data[1]) ? 0 : data[1];
- data[2] = Double.isNaN(data[2]) ? 0 : data[2];
-
- return VectorUtils.of(data);
- };
-
- IgniteBiFunction<Integer, Object[], Double> lbExtractor = (k, v) -> (double)v[1];
-
- LogisticRegressionModel mdl = SparkModelParser.load();
-
- System.out.println(">>> Logistic regression model: " + mdl);
-
- double accuracy = BinaryClassificationEvaluator.evaluate(
- dataCache,
- mdl,
- featureExtractor,
- lbExtractor,
- new Accuracy<>()
- );
-
- System.out.println("\n>>> Accuracy " + accuracy);
- System.out.println("\n>>> Test Error " + (1 - accuracy));
- }
- }
-
- /** Util class to build the LogReg model. */
- private static class SparkModelParser {
- /** Parquet path. */
- private static Path parquetPath
- = new Path("examples/src/main/resources/models/spark/serialized/data/part-00000-7551081d-c0a8-4ed7-afe4-a464aabc7f80-c000.snappy.parquet");
-
- /**
- * Load LogReg model from parquet file.
- *
- * @return Instance of LogReg model.
- */
- public static LogisticRegressionModel load() {
- // Default values
- double[] rawCoefficients = {-0.7442986893142758, -0.09165692978346071, 0.24494383939344133};
- Vector coefficients = new DenseVector(rawCoefficients);
- double interceptor = 1.1374435712657005;
-
- Configuration conf = new Configuration();
- try {
- ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, parquetPath, ParquetMetadataConverter.NO_FILTER);
- MessageType schema = readFooter.getFileMetaData().getSchema();
-
- PageReadStore pages;
- try (ParquetFileReader r = new ParquetFileReader(conf, parquetPath, readFooter)) {
- while (null != (pages = r.readNextRowGroup())) {
- final long rows = pages.getRowCount();
- final MessageColumnIO colIO = new ColumnIOFactory().getColumnIO(schema);
- final RecordReader recordReader = colIO.getRecordReader(pages, new GroupRecordConverter(schema));
- for (int i = 0; i < rows; i++) {
- final SimpleGroup g = (SimpleGroup)recordReader.read();
- interceptor = readInterceptor(g);
- coefficients = readCoefficients(g);
- }
- }
- }
- }
- catch (IOException e) {
- System.out.println("Error reading parquet file.");
- e.printStackTrace();
- }
-
- return new LogisticRegressionModel(coefficients, interceptor);
-
- }
-
- /**
- * Read interceptor value from parquet.
- *
- * @param g Interceptor group.
- */
- private static double readInterceptor(SimpleGroup g) {
- double interceptor;
- final SimpleGroup interceptVector = (SimpleGroup)g.getGroup(2, 0);
- final SimpleGroup interceptVectorVal = (SimpleGroup)interceptVector.getGroup(3, 0);
- final SimpleGroup interceptVectorValElement = (SimpleGroup)interceptVectorVal.getGroup(0, 0);
- interceptor = interceptVectorValElement.getDouble(0, 0);
- return interceptor;
- }
-
- /**
- * Read coefficient matrix from parquet.
- *
- * @param g Coefficient group.
- * @return Vector of coefficients.
- */
- @NotNull private static Vector readCoefficients(SimpleGroup g) {
- Vector coefficients;
- final int amountOfCoefficients = g.getGroup(3, 0).getGroup(5, 0).getFieldRepetitionCount(0);
-
- coefficients = new DenseVector(amountOfCoefficients);
-
- for (int j = 0; j < amountOfCoefficients; j++) {
- double coefficient = g.getGroup(3, 0).getGroup(5, 0).getGroup(0, j).getDouble(0, 0);
- coefficients.set(j, coefficient);
- }
- return coefficients;
- }
-
- /**
- * Load mock data Coefficients: [-0.7442986893142758,-0.09165692978346071,0.24494383939344133] Intercept: 1.1374435712657005.
- *
- * @return LogReg Model.
- */
- public static LogisticRegressionModel loadMock() {
- double[] rawCoefficients = {-0.7442986893142758, -0.09165692978346071, 0.24494383939344133};
- Vector coefficients = new DenseVector(rawCoefficients);
- double interceptor = 1.1374435712657005;
- return new LogisticRegressionModel(coefficients, interceptor);
- }
- }
-}
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/LogRegFromSparkThroughPMMLExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/LogRegFromSparkThroughPMMLExample.java
similarity index 98%
rename from examples/src/main/java/org/apache/ignite/examples/ml/inference/LogRegFromSparkThroughPMMLExample.java
rename to examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/LogRegFromSparkThroughPMMLExample.java
index 30a4498..acb5cf1 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ml/inference/LogRegFromSparkThroughPMMLExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/LogRegFromSparkThroughPMMLExample.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.examples.ml.inference;
+package org.apache.ignite.examples.ml.inference.spark;
import java.io.File;
import java.io.FileInputStream;
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/LogRegFromSparkViaSparkModelParserExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/LogRegFromSparkViaSparkModelParserExample.java
new file mode 100644
index 0000000..ec911c3
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/LogRegFromSparkViaSparkModelParserExample.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.inference.spark;
+
+import java.io.FileNotFoundException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ml.tutorial.TitanicUtils;
+import org.apache.ignite.ml.math.functions.IgniteBiFunction;
+import org.apache.ignite.ml.math.primitives.vector.Vector;
+import org.apache.ignite.ml.math.primitives.vector.VectorUtils;
+import org.apache.ignite.ml.regressions.logistic.LogisticRegressionModel;
+import org.apache.ignite.ml.selection.scoring.evaluator.BinaryClassificationEvaluator;
+import org.apache.ignite.ml.selection.scoring.metric.Accuracy;
+import org.apache.ignite.ml.sparkmodelparser.SparkModelParser;
+import org.apache.ignite.ml.sparkmodelparser.SupportedSparkModels;
+
+/**
+ * Run logistic regression model loaded from snappy.parquet file.
+ * The snappy.parquet file was generated by Spark MLLib model.write.overwrite().save(..) operator.
+ * <p>
+ * You can change the test data used in this example and re-run it to explore this algorithm further.</p>
+ */
+public class LogRegFromSparkViaSparkModelParserExample {
+ /** Path to Spark LogReg model. */
+ public static final String SPARK_LOG_REG_MDL_PATH = "examples/src/main/resources/models/spark/serialized/data" +
+ "/part-00000-7551081d-c0a8-4ed7-afe4-a464aabc7f80-c000.snappy.parquet";
+
+ /** Run example. */
+ public static void main(String[] args) throws FileNotFoundException {
+ System.out.println();
+ System.out.println(">>> Logistic regression model loaded from Spark through serialization over partitioned dataset usage example started.");
+ // Start ignite grid.
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println(">>> Ignite grid started.");
+
+ IgniteCache<Integer, Object[]> dataCache = TitanicUtils.readPassengers(ignite);
+
+ IgniteBiFunction<Integer, Object[], Vector> featureExtractor = (k, v) -> {
+ double[] data = new double[] {(double)v[0], (double)v[5], (double)v[6]};
+ data[0] = Double.isNaN(data[0]) ? 0 : data[0];
+ data[1] = Double.isNaN(data[1]) ? 0 : data[1];
+ data[2] = Double.isNaN(data[2]) ? 0 : data[2];
+
+ return VectorUtils.of(data);
+ };
+
+ IgniteBiFunction<Integer, Object[], Double> lbExtractor = (k, v) -> (double)v[1];
+
+ LogisticRegressionModel mdl = (LogisticRegressionModel) SparkModelParser.parse(
+ SPARK_LOG_REG_MDL_PATH,
+ SupportedSparkModels.LOG_REGRESSION
+ );
+
+ System.out.println(">>> Logistic regression model: " + mdl);
+
+ double accuracy = BinaryClassificationEvaluator.evaluate(
+ dataCache,
+ mdl,
+ featureExtractor,
+ lbExtractor,
+ new Accuracy<>()
+ );
+
+ System.out.println("\n>>> Accuracy " + accuracy);
+ System.out.println("\n>>> Test Error " + (1 - accuracy));
+ }
+ }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/package-info.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/package-info.java
new file mode 100644
index 0000000..1db3cd2
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/inference/spark/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Model inference examples for Spark ML models.
+ */
+package org.apache.ignite.examples.ml.inference.spark;
diff --git a/modules/ml/pom.xml b/modules/ml/pom.xml
index 6809098..db64d86 100644
--- a/modules/ml/pom.xml
+++ b/modules/ml/pom.xml
@@ -19,7 +19,8 @@
<!--
POM file.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<commons.math.version>3.6.1</commons.math.version>
@@ -53,16 +54,16 @@
</dependency>
<dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-indexing</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-indexing</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-spring</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
diff --git a/modules/ml/spark-model-parser/pom.xml b/modules/ml/spark-model-parser/pom.xml
new file mode 100644
index 0000000..570469d
--- /dev/null
+++ b/modules/ml/spark-model-parser/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>ignite-parent</artifactId>
+ <groupId>org.apache.ignite</groupId>
+ <version>1</version>
+ <relativePath>../../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-ml-spark-model-parser</artifactId>
+ <version>2.8.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-ml</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.10.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/SparkModelParser.java b/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/SparkModelParser.java
new file mode 100644
index 0000000..351ca0b
--- /dev/null
+++ b/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/SparkModelParser.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.sparkmodelparser;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.inference.Model;
+import org.apache.ignite.ml.math.primitives.vector.Vector;
+import org.apache.ignite.ml.math.primitives.vector.impl.DenseVector;
+import org.apache.ignite.ml.regressions.logistic.LogisticRegressionModel;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+
+/** Parser of Spark models. */
+public class SparkModelParser {
+ /**
+ * Load model from parquet file.
+ *
+ * @param pathToMdl Hadoop path to model saved from Spark.
+ * @param parsedSparkMdl One of supported Spark models to parse it.
+ * @return Instance of parsedSparkMdl model.
+ */
+ public static Model parse(String pathToMdl, SupportedSparkModels parsedSparkMdl) {
+ File mdlRsrc = IgniteUtils.resolveIgnitePath(pathToMdl);
+ if (mdlRsrc == null)
+ throw new IllegalArgumentException("Resource not found [resource_path=" + pathToMdl + "]");
+
+ String ignitePathToMdl = mdlRsrc.getPath();
+
+ switch (parsedSparkMdl) {
+ case LOG_REGRESSION:
+ return loadLogRegModel(ignitePathToMdl);
+ default:
+ throw new UnsupportedSparkModelException(ignitePathToMdl);
+ }
+ }
+
+ /**
+ * Load logistic regression model.
+ *
+ * @param pathToMdl Path to model.
+ */
+ private static Model loadLogRegModel(String pathToMdl) {
+ Vector coefficients = null;
+ double interceptor = 0;
+
+ try (ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(pathToMdl), new Configuration()))) {
+ PageReadStore pages;
+ final MessageType schema = r.getFooter().getFileMetaData().getSchema();
+ final MessageColumnIO colIO = new ColumnIOFactory().getColumnIO(schema);
+
+ while (null != (pages = r.readNextRowGroup())) {
+ final long rows = pages.getRowCount();
+ final RecordReader recordReader = colIO.getRecordReader(pages, new GroupRecordConverter(schema));
+ for (int i = 0; i < rows; i++) {
+ final SimpleGroup g = (SimpleGroup)recordReader.read();
+ interceptor = readInterceptor(g);
+ coefficients = readCoefficients(g);
+ }
+ }
+
+ }
+ catch (IOException e) {
+ System.out.println("Error reading parquet file.");
+ e.printStackTrace();
+ }
+
+ return new LogisticRegressionModel(coefficients, interceptor);
+
+ }
+
+ /**
+ * Read interceptor value from parquet.
+ *
+ * @param g Interceptor group.
+ */
+ private static double readInterceptor(SimpleGroup g) {
+ double interceptor;
+
+ final SimpleGroup interceptVector = (SimpleGroup)g.getGroup(2, 0);
+ final SimpleGroup interceptVectorVal = (SimpleGroup)interceptVector.getGroup(3, 0);
+ final SimpleGroup interceptVectorValElement = (SimpleGroup)interceptVectorVal.getGroup(0, 0);
+
+ interceptor = interceptVectorValElement.getDouble(0, 0);
+
+ return interceptor;
+ }
+
+ /**
+ * Read coefficient matrix from parquet.
+ *
+ * @param g Coefficient group.
+ * @return Vector of coefficients.
+ */
+ private static Vector readCoefficients(SimpleGroup g) {
+ Vector coefficients;
+ final int amountOfCoefficients = g.getGroup(3, 0).getGroup(5, 0).getFieldRepetitionCount(0);
+
+ coefficients = new DenseVector(amountOfCoefficients);
+
+ for (int j = 0; j < amountOfCoefficients; j++) {
+ double coefficient = g.getGroup(3, 0).getGroup(5, 0).getGroup(0, j).getDouble(0, 0);
+ coefficients.set(j, coefficient);
+ }
+ return coefficients;
+ }
+}
diff --git a/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/SupportedSparkModels.java b/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/SupportedSparkModels.java
new file mode 100644
index 0000000..9b203ea
--- /dev/null
+++ b/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/SupportedSparkModels.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.sparkmodelparser;
+
+/**
+ * List of supported Spark models.
+ *
+ * NOTE: Valid for Spark 2.4.
+ */
+public enum SupportedSparkModels {
+ /** Logistic regression. */LOG_REGRESSION
+}
diff --git a/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/UnsupportedSparkModelException.java b/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/UnsupportedSparkModelException.java
new file mode 100644
index 0000000..855e8e7
--- /dev/null
+++ b/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/UnsupportedSparkModelException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.sparkmodelparser;
+
+import org.apache.ignite.IgniteException;
+
+/**
+ * Indicates an unsupported Spark Model.
+ */
+public class UnsupportedSparkModelException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * @param modelName Model name that caused this exception.
+ */
+ public UnsupportedSparkModelException(String modelName) {
+ super("This Spark ML model is not supported yet: " + modelName);
+ }
+}
diff --git a/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/package-info.java b/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/package-info.java
new file mode 100644
index 0000000..e5b722d
--- /dev/null
+++ b/modules/ml/spark-model-parser/src/main/java/org/apache/ignite/ml/sparkmodelparser/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Base package for Spark model parser.
+ */
+package org.apache.ignite.ml.sparkmodelparser;
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f8653f9..36f85df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,7 @@
<module>modules/rocketmq</module>
<module>modules/sqlline</module>
<module>modules/ml</module>
+ <module>modules/ml/spark-model-parser</module>
<module>modules/ml/xgboost-model-parser</module>
<module>modules/tensorflow</module>
</modules>