You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ch...@apache.org on 2018/11/21 10:31:58 UTC
[2/2] ignite git commit: IGNITE-10234: ML: Create a skeleton for
model inference in Apache Ignite
IGNITE-10234: ML: Create a skeleton for model inference in Apache Ignite
this closes #5415
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ec118646
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ec118646
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ec118646
Branch: refs/heads/master
Commit: ec118646862cb15e5d8eaff5f855be93996ebe26
Parents: 4074a9b
Author: Anton Dmitriev <dm...@gmail.com>
Authored: Wed Nov 21 13:31:28 2018 +0300
Committer: Yury Babak <yb...@gridgain.com>
Committed: Wed Nov 21 13:31:28 2018 +0300
----------------------------------------------------------------------
...niteFunctionDistributedInferenceExample.java | 100 +++++
.../TensorFlowDistributedInferenceExample.java | 99 +++++
.../TensorFlowLocalInferenceExample.java | 85 +++++
.../TensorFlowThreadedInferenceExample.java | 95 +++++
.../examples/ml/inference/package-info.java | 22 ++
.../ml/util/datasets/t10k-images-idx3-ubyte | Bin 0 -> 7840016 bytes
.../ml/util/datasets/t10k-labels-idx1-ubyte | Bin 0 -> 10008 bytes
.../resources/ml/mnist_tf_model/saved_model.pb | Bin 0 -> 37185 bytes
.../variables/variables.data-00000-of-00001 | Bin 0 -> 13098544 bytes
.../ml/mnist_tf_model/variables/variables.index | Bin 0 -> 410 bytes
modules/ml/pom.xml | 13 +
.../apache/ignite/ml/inference/InfModel.java | 37 ++
.../ignite/ml/inference/ModelDescriptor.java | 86 +++++
.../ignite/ml/inference/ModelSignature.java | 62 ++++
.../inference/builder/AsyncInfModelBuilder.java | 43 +++
.../IgniteDistributedInfModelBuilder.java | 367 +++++++++++++++++++
.../builder/SingleInfModelBuilder.java | 34 ++
.../inference/builder/SyncInfModelBuilder.java | 42 +++
.../builder/ThreadedInfModelBuilder.java | 86 +++++
.../ml/inference/builder/package-info.java | 22 ++
.../ignite/ml/inference/package-info.java | 22 ++
.../parser/IgniteFunctionInfModelParser.java | 76 ++++
.../ml/inference/parser/InfModelParser.java | 38 ++
.../parser/TensorFlowBaseInfModelParser.java | 216 +++++++++++
.../parser/TensorFlowGraphInfModelParser.java | 40 ++
.../TensorFlowSavedModelInfModelParser.java | 70 ++++
.../ml/inference/parser/package-info.java | 22 ++
.../reader/FileSystemInfModelReader.java | 61 +++
.../reader/InMemoryInfModelReader.java | 67 ++++
.../ml/inference/reader/InfModelReader.java | 33 ++
.../ml/inference/reader/package-info.java | 22 ++
.../storage/IgniteModelDescriptorStorage.java | 57 +++
.../storage/LocalModelDescriptorStorage.java | 45 +++
.../storage/ModelDescriptorStorage.java | 48 +++
.../ml/inference/storage/package-info.java | 22 ++
.../ml/inference/util/DirectorySerializer.java | 130 +++++++
.../ignite/ml/inference/util/package-info.java | 22 ++
.../org/apache/ignite/ml/util/MnistUtils.java | 79 ++--
.../org/apache/ignite/ml/IgniteMLTestSuite.java | 2 +
.../ignite/ml/inference/InferenceTestSuite.java | 38 ++
.../IgniteDistributedInfModelBuilderTest.java | 71 ++++
.../builder/InfModelBuilderTestUtil.java | 53 +++
.../builder/SingleInfModelBuilderTest.java | 42 +++
.../builder/ThreadedInfModelBuilderTest.java | 44 +++
.../inference/util/DirectorySerializerTest.java | 124 +++++++
45 files changed, 2613 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/java/org/apache/ignite/examples/ml/inference/IgniteFunctionDistributedInferenceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/IgniteFunctionDistributedInferenceExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/IgniteFunctionDistributedInferenceExample.java
new file mode 100644
index 0000000..da9d543
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/inference/IgniteFunctionDistributedInferenceExample.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.inference;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.examples.ml.regression.linear.LinearRegressionLSQRTrainerExample;
+import org.apache.ignite.examples.ml.util.MLSandboxDatasets;
+import org.apache.ignite.examples.ml.util.SandboxMLCache;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.builder.IgniteDistributedInfModelBuilder;
+import org.apache.ignite.ml.inference.parser.IgniteFunctionInfModelParser;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.reader.InMemoryInfModelReader;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+import org.apache.ignite.ml.math.primitives.vector.Vector;
+import org.apache.ignite.ml.regressions.linear.LinearRegressionLSQRTrainer;
+import org.apache.ignite.ml.regressions.linear.LinearRegressionModel;
+
+/**
+ * This example is based on {@link LinearRegressionLSQRTrainerExample}, but to perform inference it uses an approach
+ * implemented in {@link org.apache.ignite.ml.inference} package.
+ */
+public class IgniteFunctionDistributedInferenceExample {
+ /** Run example. */
+ public static void main(String... args) throws IOException, ExecutionException, InterruptedException {
+ System.out.println();
+ System.out.println(">>> Linear regression model over cache based dataset usage example started.");
+ // Start ignite grid.
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println(">>> Ignite grid started.");
+
+ IgniteCache<Integer, Vector> dataCache = new SandboxMLCache(ignite)
+ .fillCacheWith(MLSandboxDatasets.MORTALITY_DATA);
+
+ System.out.println(">>> Create new linear regression trainer object.");
+ LinearRegressionLSQRTrainer trainer = new LinearRegressionLSQRTrainer();
+
+ System.out.println(">>> Perform the training to get the model.");
+ LinearRegressionModel mdl = trainer.fit(
+ ignite,
+ dataCache,
+ (k, v) -> v.copyOfRange(1, v.size()),
+ (k, v) -> v.get(0)
+ );
+
+ System.out.println(">>> Linear regression model: " + mdl);
+
+ System.out.println(">>> Preparing model reader and model parser.");
+ InfModelReader reader = new InMemoryInfModelReader(mdl);
+ InfModelParser<Vector, Double> parser = new IgniteFunctionInfModelParser<>();
+ try (InfModel<Vector, Future<Double>> infMdl = new IgniteDistributedInfModelBuilder(ignite, 4, 4)
+ .build(reader, parser)) {
+ System.out.println(">>> Inference model is ready.");
+
+ System.out.println(">>> ---------------------------------");
+ System.out.println(">>> | Prediction\t| Ground Truth\t|");
+ System.out.println(">>> ---------------------------------");
+
+ try (QueryCursor<Cache.Entry<Integer, Vector>> observations = dataCache.query(new ScanQuery<>())) {
+ for (Cache.Entry<Integer, Vector> observation : observations) {
+ Vector val = observation.getValue();
+ Vector inputs = val.copyOfRange(1, val.size());
+ double groundTruth = val.get(0);
+
+ double prediction = infMdl.predict(inputs).get();
+
+ System.out.printf(">>> | %.4f\t\t| %.4f\t\t|\n", prediction, groundTruth);
+ }
+ }
+ }
+
+ System.out.println(">>> ---------------------------------");
+
+ System.out.println(">>> Linear regression model over cache based dataset usage example completed.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowDistributedInferenceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowDistributedInferenceExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowDistributedInferenceExample.java
new file mode 100644
index 0000000..cc22df3
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowDistributedInferenceExample.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.inference;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.builder.IgniteDistributedInfModelBuilder;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.parser.TensorFlowSavedModelInfModelParser;
+import org.apache.ignite.ml.inference.reader.FileSystemInfModelReader;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+import org.apache.ignite.ml.util.MnistUtils;
+import org.tensorflow.Tensor;
+
+/**
+ * This example demonstrates how to: load TensorFlow model into Java, make inference in distributed environment using
+ * Apache Ignite services.
+ */
+public class TensorFlowDistributedInferenceExample {
+ /** Path to the directory with saved TensorFlow model. */
+ private static final String MODEL_PATH = "examples/src/main/resources/ml/mnist_tf_model";
+
+ /** Path to the MNIST images data. */
+ private static final String MNIST_IMG_PATH = "org/apache/ignite/examples/ml/util/datasets/t10k-images-idx3-ubyte";
+
+ /** Path to the MNIST labels data. */
+ private static final String MNIST_LBL_PATH = "org/apache/ignite/examples/ml/util/datasets/t10k-labels-idx1-ubyte";
+
+ /** Run example. */
+ public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ File mdlRsrc = IgniteUtils.resolveIgnitePath(MODEL_PATH);
+ if (mdlRsrc == null)
+ throw new IllegalArgumentException("Resource not found [resource_path=" + MODEL_PATH + "]");
+
+ InfModelReader reader = new FileSystemInfModelReader(mdlRsrc.getPath());
+
+ InfModelParser<double[], Long> parser = new TensorFlowSavedModelInfModelParser<double[], Long>("serve")
+
+ .withInput("Placeholder", doubles -> {
+ float[][][] reshaped = new float[1][28][28];
+ for (int i = 0; i < doubles.length; i++)
+ reshaped[0][i / 28][i % 28] = (float)doubles[i];
+ return Tensor.create(reshaped);
+ })
+
+ .withOutput(Collections.singletonList("ArgMax"), collectedTensors -> {
+ return collectedTensors.get("ArgMax").copyTo(new long[1])[0];
+ });
+
+ List<MnistUtils.MnistLabeledImage> images = MnistUtils.mnistAsListFromResource(
+ MNIST_IMG_PATH,
+ MNIST_LBL_PATH,
+ new Random(0),
+ 10000
+ );
+
+ long t0 = System.currentTimeMillis();
+
+ try (InfModel<double[], Future<Long>> threadedMdl = new IgniteDistributedInfModelBuilder(ignite, 4, 4)
+ .build(reader, parser)) {
+ List<Future<?>> futures = new ArrayList<>(images.size());
+ for (MnistUtils.MnistLabeledImage image : images)
+ futures.add(threadedMdl.predict(image.getPixels()));
+ for (Future<?> f : futures)
+ f.get();
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ System.out.println("Threaded model throughput: " + images.size() / ((t1 - t0) / 1000.0) + " req/sec");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowLocalInferenceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowLocalInferenceExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowLocalInferenceExample.java
new file mode 100644
index 0000000..fc25c7e
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowLocalInferenceExample.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.inference;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.builder.SingleInfModelBuilder;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.parser.TensorFlowSavedModelInfModelParser;
+import org.apache.ignite.ml.inference.reader.FileSystemInfModelReader;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+import org.apache.ignite.ml.util.MnistUtils;
+import org.tensorflow.Tensor;
+
+/**
+ * This example demonstrates how to: load TensorFlow model into Java, make inference using this model in one thread.
+ */
+public class TensorFlowLocalInferenceExample {
+ /** Path to the directory with saved TensorFlow model. */
+ private static final String MODEL_PATH = "examples/src/main/resources/ml/mnist_tf_model";
+
+ /** Path to the MNIST images data. */
+ private static final String MNIST_IMG_PATH = "org/apache/ignite/examples/ml/util/datasets/t10k-images-idx3-ubyte";
+
+ /** Path to the MNIST labels data. */
+ private static final String MNIST_LBL_PATH = "org/apache/ignite/examples/ml/util/datasets/t10k-labels-idx1-ubyte";
+
+ /** Run example. */
+ public static void main(String[] args) throws IOException {
+ File mdlRsrc = IgniteUtils.resolveIgnitePath(MODEL_PATH);
+ if (mdlRsrc == null)
+ throw new IllegalArgumentException("Resource not found [resource_path=" + MODEL_PATH + "]");
+
+ InfModelReader reader = new FileSystemInfModelReader(mdlRsrc.getPath());
+
+ InfModelParser<double[], Long> parser = new TensorFlowSavedModelInfModelParser<double[], Long>("serve")
+ .withInput("Placeholder", doubles -> {
+ float[][][] reshaped = new float[1][28][28];
+ for (int i = 0; i < doubles.length; i++)
+ reshaped[0][i / 28][i % 28] = (float)doubles[i];
+ return Tensor.create(reshaped);
+ })
+ .withOutput(Collections.singletonList("ArgMax"), collectedTensors -> {
+ return collectedTensors.get("ArgMax").copyTo(new long[1])[0];
+ });
+
+ List<MnistUtils.MnistLabeledImage> images = MnistUtils.mnistAsListFromResource(
+ MNIST_IMG_PATH,
+ MNIST_LBL_PATH,
+ new Random(0),
+ 10000
+ );
+
+ long t0 = System.currentTimeMillis();
+
+ try (InfModel<double[], Long> locMdl = new SingleInfModelBuilder().build(reader, parser)) {
+ for (MnistUtils.MnistLabeledImage image : images)
+ locMdl.predict(image.getPixels());
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ System.out.println("Threaded model throughput: " + 1.0 * images.size() / ((t1 - t0) / 1000) + " req/sec");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowThreadedInferenceExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowThreadedInferenceExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowThreadedInferenceExample.java
new file mode 100644
index 0000000..d756016
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/inference/TensorFlowThreadedInferenceExample.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.ml.inference;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.builder.ThreadedInfModelBuilder;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.parser.TensorFlowSavedModelInfModelParser;
+import org.apache.ignite.ml.inference.reader.FileSystemInfModelReader;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+import org.apache.ignite.ml.util.MnistUtils;
+import org.tensorflow.Tensor;
+
+/**
+ * This example demonstrates how to: load TensorFlow model into Java, make inference using this model in multiple
+ * threads.
+ */
+public class TensorFlowThreadedInferenceExample {
+ /** Path to the directory with saved TensorFlow model. */
+ private static final String MODEL_PATH = "examples/src/main/resources/ml/mnist_tf_model";
+
+ /** Path to the MNIST images data. */
+ private static final String MNIST_IMG_PATH = "org/apache/ignite/examples/ml/util/datasets/t10k-images-idx3-ubyte";
+
+ /** Path to the MNIST labels data. */
+ private static final String MNIST_LBL_PATH = "org/apache/ignite/examples/ml/util/datasets/t10k-labels-idx1-ubyte";
+
+ /** Run example. */
+ public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
+ File mdlRsrc = IgniteUtils.resolveIgnitePath(MODEL_PATH);
+ if (mdlRsrc == null)
+ throw new IllegalArgumentException("Resource not found [resource_path=" + MODEL_PATH + "]");
+
+ InfModelReader reader = new FileSystemInfModelReader(mdlRsrc.getPath());
+
+ InfModelParser<double[], Long> parser = new TensorFlowSavedModelInfModelParser<double[], Long>("serve")
+
+ .withInput("Placeholder", doubles -> {
+ float[][][] reshaped = new float[1][28][28];
+ for (int i = 0; i < doubles.length; i++)
+ reshaped[0][i / 28][i % 28] = (float)doubles[i];
+ return Tensor.create(reshaped);
+ })
+
+ .withOutput(Collections.singletonList("ArgMax"), collectedTensors -> {
+ return collectedTensors.get("ArgMax").copyTo(new long[1])[0];
+ });
+
+ List<MnistUtils.MnistLabeledImage> images = MnistUtils.mnistAsListFromResource(
+ MNIST_IMG_PATH,
+ MNIST_LBL_PATH,
+ new Random(0),
+ 10000
+ );
+
+ long t0 = System.currentTimeMillis();
+
+ try (InfModel<double[], Future<Long>> threadedMdl = new ThreadedInfModelBuilder(8)
+ .build(reader, parser)) {
+ List<Future<?>> futures = new ArrayList<>(images.size());
+ for (MnistUtils.MnistLabeledImage image : images)
+ futures.add(threadedMdl.predict(image.getPixels()));
+ for (Future<?> f : futures)
+ f.get();
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ System.out.println("Threaded model throughput: " + 1.0 * images.size() / ((t1 - t0) / 1000) + " req/sec");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/java/org/apache/ignite/examples/ml/inference/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/inference/package-info.java b/examples/src/main/java/org/apache/ignite/examples/ml/inference/package-info.java
new file mode 100644
index 0000000..db1616b
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/ml/inference/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Model inference examples.
+ */
+package org.apache.ignite.examples.ml.inference;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/java/org/apache/ignite/examples/ml/util/datasets/t10k-images-idx3-ubyte
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/util/datasets/t10k-images-idx3-ubyte b/examples/src/main/java/org/apache/ignite/examples/ml/util/datasets/t10k-images-idx3-ubyte
new file mode 100644
index 0000000..1170b2c
Binary files /dev/null and b/examples/src/main/java/org/apache/ignite/examples/ml/util/datasets/t10k-images-idx3-ubyte differ
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/java/org/apache/ignite/examples/ml/util/datasets/t10k-labels-idx1-ubyte
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/util/datasets/t10k-labels-idx1-ubyte b/examples/src/main/java/org/apache/ignite/examples/ml/util/datasets/t10k-labels-idx1-ubyte
new file mode 100644
index 0000000..d1c3a97
Binary files /dev/null and b/examples/src/main/java/org/apache/ignite/examples/ml/util/datasets/t10k-labels-idx1-ubyte differ
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/resources/ml/mnist_tf_model/saved_model.pb
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/ml/mnist_tf_model/saved_model.pb b/examples/src/main/resources/ml/mnist_tf_model/saved_model.pb
new file mode 100644
index 0000000..4d36671
Binary files /dev/null and b/examples/src/main/resources/ml/mnist_tf_model/saved_model.pb differ
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/resources/ml/mnist_tf_model/variables/variables.data-00000-of-00001
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/ml/mnist_tf_model/variables/variables.data-00000-of-00001 b/examples/src/main/resources/ml/mnist_tf_model/variables/variables.data-00000-of-00001
new file mode 100644
index 0000000..a65398f
Binary files /dev/null and b/examples/src/main/resources/ml/mnist_tf_model/variables/variables.data-00000-of-00001 differ
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/examples/src/main/resources/ml/mnist_tf_model/variables/variables.index
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/ml/mnist_tf_model/variables/variables.index b/examples/src/main/resources/ml/mnist_tf_model/variables/variables.index
new file mode 100644
index 0000000..221dd2d
Binary files /dev/null and b/examples/src/main/resources/ml/mnist_tf_model/variables/variables.index differ
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/pom.xml
----------------------------------------------------------------------
diff --git a/modules/ml/pom.xml b/modules/ml/pom.xml
index ad31da2..69c77ff 100644
--- a/modules/ml/pom.xml
+++ b/modules/ml/pom.xml
@@ -23,6 +23,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<commons.math.version>3.6.1</commons.math.version>
+ <tensorflow.version>1.12.0</tensorflow.version>
</properties>
<parent>
@@ -116,6 +117,18 @@
</dependency>
<dependency>
+ <groupId>org.tensorflow</groupId>
+ <artifactId>tensorflow</artifactId>
+ <version>${tensorflow.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.tensorflow</groupId>
+ <artifactId>proto</artifactId>
+ <version>${tensorflow.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/InfModel.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/InfModel.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/InfModel.java
new file mode 100644
index 0000000..ea96407
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/InfModel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference;
+
+/**
+ * Inference model that can be used to make predictions.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+public interface InfModel<I, O> extends AutoCloseable {
+ /**
+ * Make a prediction for the specified input arguments.
+ *
+ * @param input Input arguments.
+ * @return Prediction result.
+ */
+ public O predict(I input);
+
+ /** {@inheritDoc} */
+ @Override public void close();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java
new file mode 100644
index 0000000..e156063
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelDescriptor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+
+/**
+ * Model descriptor that encapsulates information about model, {@link InfModelReader} and {@link InfModelParser} which
+ * is required to build the model.
+ */
+public class ModelDescriptor implements Serializable {
+ /** Model name. */
+ private final String name;
+
+ /** Model description. */
+ private final String desc;
+
+ /** Model signature that keeps input/output types in Protobuf. */
+ private final ModelSignature signature;
+
+ /** Model reader. */
+ private final InfModelReader reader;
+
+ /** Model parser. */
+ private final InfModelParser<byte[], byte[]> parser;
+
+ /**
+ * Constructs a new instance of model descriptor.
+ *
+ * @param name Model name.
+ * @param desc Model description.
+ * @param signature Model signature that keeps input/output types in Protobuf.
+ * @param reader Model reader.
+ * @param parser Model parser.
+ */
+ public ModelDescriptor(String name, String desc, ModelSignature signature, InfModelReader reader,
+ InfModelParser<byte[], byte[]> parser) {
+ this.name = name;
+ this.desc = desc;
+ this.signature = signature;
+ this.reader = reader;
+ this.parser = parser;
+ }
+
+ /** */
+ public String getName() {
+ return name;
+ }
+
+ /** */
+ public String getDesc() {
+ return desc;
+ }
+
+ /** */
+ public ModelSignature getSignature() {
+ return signature;
+ }
+
+ /** */
+ public InfModelReader getReader() {
+ return reader;
+ }
+
+ /** */
+ public InfModelParser<byte[], byte[]> getParser() {
+ return parser;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelSignature.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelSignature.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelSignature.java
new file mode 100644
index 0000000..91844e5
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/ModelSignature.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference;
+
+import java.io.Serializable;
+
+/**
+ * Signature that defines input/output types in Protobuf.
+ */
+public class ModelSignature implements Serializable {
+ /** Protobuf schema of all objects required in the model. */
+ private final String schema;
+
+ /** Name of the input type (should be presented in the {@link #schema}. */
+ private final String inputMsg;
+
+ /** Name of ths output type (should be presented in the {@link #schema}). */
+ private final String outputMsg;
+
+ /**
+ * Constructs a new instance of model signature.
+ *
+ * @param schema Protobuf schema of all objects required in the model.
+ * @param inputMsg Name of the input type (should be presented in the {@link #schema}.
+ * @param outputMsg Name of ths output type (should be presented in the {@link #schema}).
+ */
+ public ModelSignature(String schema, String inputMsg, String outputMsg) {
+ this.schema = schema;
+ this.inputMsg = inputMsg;
+ this.outputMsg = outputMsg;
+ }
+
+ /** */
+ public String getSchema() {
+ return schema;
+ }
+
+ /** */
+ public String getInputMsg() {
+ return inputMsg;
+ }
+
+ /** */
+ public String getOutputMsg() {
+ return outputMsg;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncInfModelBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncInfModelBuilder.java
new file mode 100644
index 0000000..adf4659
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/AsyncInfModelBuilder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.builder;
+
+import java.io.Serializable;
+import java.util.concurrent.Future;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+
+/**
+ * Builder of asynchronous inference model. Uses specified model reader (see {@link InfModelReader}) and mode parser
+ * (see {@link InfModelParser}) to build a model.
+ */
+@FunctionalInterface
+public interface AsyncInfModelBuilder {
+ /**
+ * Builds asynchronous inference model using specified model reader and model parser.
+ *
+ * @param reader Model reader.
+ * @param parser Model parser.
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ * @return Inference model.
+ */
+ public <I extends Serializable, O extends Serializable> InfModel<I, Future<O>> build(InfModelReader reader,
+ InfModelParser<I, O> parser);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedInfModelBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedInfModelBuilder.java
new file mode 100644
index 0000000..7a176e0
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/IgniteDistributedInfModelBuilder.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.builder;
+
+import java.io.Serializable;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+
+/**
+ * Builder that allows to start Apache Ignite services for distributed inference and get a facade that allows to work
+ * with this distributed inference infrastructure as with a single inference model (see {@link InfModel}).
+ *
+ * The common workflow is based on a request/response queues and multiple workers represented by Apache Ignite services.
+ * When the {@link #build(InfModelReader, InfModelParser)} method is called Apache Ignite starts the specified number of
+ * service instances and request/response queues. Each service instance reads request queue, processes inbound requests
+ * and writes responses to response queue. The facade returned by the {@link #build(InfModelReader, InfModelParser)}
+ * method operates with request/response queues. When the {@link InfModel#predict(Object)} method is called the argument
+ * is sent as a request to the request queue. When the response is appeared in the response queue the {@link Future}
+ * correspondent to the previously sent request is completed and the processing finishes.
+ *
+ * Be aware that {@link InfModel#close()} method must be called to clear allocated resources, stop services and remove
+ * queues.
+ */
+public class IgniteDistributedInfModelBuilder implements AsyncInfModelBuilder {
+ /** Template of the inference service name. */
+ private static final String INFERENCE_SERVICE_NAME_PATTERN = "inference_service_%s";
+
+ /** Template of the inference request queue name. */
+ private static final String INFERENCE_REQUEST_QUEUE_NAME_PATTERN = "inference_queue_req_%s";
+
+ /** Template of the inference response queue name. */
+ private static final String INFERENCE_RESPONSE_QUEUE_NAME_PATTERN = "inference_queue_res_%s";
+
+ /** Default capacity for all queues used in this class (request queue, response queue, received queue). */
+ private static final int QUEUE_CAPACITY = 100;
+
+ /** Default configuration for Apache Ignite queues used in this class (request queue, response queue). */
+ private static final CollectionConfiguration queueCfg = new CollectionConfiguration();
+
+ /** Ignite instance. */
+ private final Ignite ignite;
+
+ /** Number of service instances maintaining to make distributed inference. */
+ private final int instances;
+
+ /** Max per node number of instances. */
+ private final int maxPerNode;
+
+ /**
+ * Constructs a new instance of Ignite distributed inference model builder.
+ *
+ * @param ignite Ignite instance.
+ * @param instances Number of service instances maintaining to make distributed inference.
+ * @param maxPerNode Max per node number of instances.
+ */
+ public IgniteDistributedInfModelBuilder(Ignite ignite, int instances, int maxPerNode) {
+ this.ignite = ignite;
+ this.instances = instances;
+ this.maxPerNode = maxPerNode;
+ }
+
+ /**
+ * Starts the specified in constructor number of service instances and request/response queues. Each service
+ * instance reads request queue, processes inbound requests and writes responses to response queue. The returned
+ * facade is represented by the {@link InfModel} operates with request/response queues, but hides these details
+ * behind {@link InfModel#predict(Object)} method of {@link InfModel}.
+ *
+ * Be aware that {@link InfModel#close()} method must be called to clear allocated resources, stop services and
+ * remove queues.
+ *
+ * @param reader Inference model reader.
+ * @param parser Inference model parser.
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ * @return Facade represented by {@link InfModel}.
+ */
+ @Override public <I extends Serializable, O extends Serializable> InfModel<I, Future<O>> build(
+ InfModelReader reader, InfModelParser<I, O> parser) {
+ return new DistributedInfModel<>(ignite, UUID.randomUUID().toString(), reader, parser, instances, maxPerNode);
+ }
+
+ /**
+ * Facade that operates with request/response queues to make distributed inference, but hides these details
+ * behind {@link InfModel#predict(Object)} method of {@link InfModel}.
+ *
+ * Be aware that {@link InfModel#close()} method must be called to clear allocated resources, stop services and
+ * remove queues.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+ private static class DistributedInfModel<I extends Serializable, O extends Serializable>
+ implements InfModel<I, Future<O>> {
+ /** Ignite instance. */
+ private final Ignite ignite;
+
+ /** Suffix that with correspondent templates formats service and queue names. */
+ private final String suffix;
+
+ /** Request queue. */
+ private final IgniteQueue<I> reqQueue;
+
+ /** Response queue. */
+ private final IgniteQueue<O> resQueue;
+
+ /** Futures that represents requests that have been sent, but haven't been responded yet. */
+ private final BlockingQueue<CompletableFuture<O>> futures = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
+
+ /** Thread pool for receiver to work in. */
+ private final ExecutorService receiverThreadPool = Executors.newSingleThreadExecutor();
+
+ /** Flag identified that model is up and running. */
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ /** Receiver future. */
+ private volatile Future<?> receiverFut;
+
+ /**
+ * Constructs a new instance of distributed inference model.
+ *
+ * @param ignite Ignite instance.
+ * @param suffix Suffix that with correspondent templates formats service and queue names.
+ * @param reader Inference model reader.
+ * @param parser Inference model parser.
+ * @param instances Number of service instances maintaining to make distributed inference.
+ * @param maxPerNode Max per node number of instances.
+ */
+ DistributedInfModel(Ignite ignite, String suffix, InfModelReader reader, InfModelParser<I, O> parser,
+ int instances, int maxPerNode) {
+ this.ignite = ignite;
+ this.suffix = suffix;
+
+ reqQueue = ignite.queue(String.format(INFERENCE_REQUEST_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY,
+ queueCfg);
+ resQueue = ignite.queue(String.format(INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY,
+ queueCfg);
+
+ startReceiver();
+ startService(reader, parser, instances, maxPerNode);
+
+ running.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Future<O> predict(I input) {
+ if (!running.get())
+ throw new IllegalStateException("Inference model is not running");
+
+ reqQueue.put(input);
+
+ try {
+ CompletableFuture<O> fut = new CompletableFuture<>();
+ futures.put(fut);
+ return fut;
+ }
+ catch (InterruptedException e) {
+ close(); // In case of exception in the above code the model state becomes invalid and model is closed.
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Starts Apache Ignite services that represent distributed inference infrastructure.
+ *
+ * @param reader Inference model reader.
+ * @param parser Inference model parser.
+ * @param instances Number of service instances maintaining to make distributed inference.
+ * @param maxPerNode Max per node number of instances.
+ */
+ private void startService(InfModelReader reader, InfModelParser<I, O> parser, int instances, int maxPerNode) {
+ ignite.services().deployMultiple(
+ String.format(INFERENCE_SERVICE_NAME_PATTERN, suffix),
+ new IgniteDistributedInfModelService<>(reader, parser, suffix),
+ instances,
+ maxPerNode
+ );
+ }
+
+ /**
+ * Stops Apache Ignite services that represent distributed inference infrastructure.
+ */
+ private void stopService() {
+ ignite.services().cancel(String.format(INFERENCE_SERVICE_NAME_PATTERN, suffix));
+ }
+
+ /**
+ * Starts the thread that reads the response queue and completed correspondent futures from {@link #futures}
+ * queue.
+ */
+ private void startReceiver() {
+ receiverFut = receiverThreadPool.submit(() -> {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ O res;
+ try {
+ res = resQueue.take();
+ }
+ catch (IllegalStateException e) {
+ if (!resQueue.removed())
+ throw e;
+ continue;
+ }
+
+ CompletableFuture<O> fut = futures.remove();
+ fut.complete(res);
+ }
+ }
+ finally {
+ close(); // If the model is not stopped yet we need to stop it to protect queue from new writes.
+ while (!futures.isEmpty()) {
+ CompletableFuture<O> fut = futures.remove();
+ fut.cancel(true);
+ }
+ }
+ });
+ }
+
+ /**
+ * Stops receiver thread that reads the response queue and completed correspondent futures from
+ * {@link #futures} queue.
+ */
+ private void stopReceiver() {
+ if (receiverFut != null && !receiverFut.isDone())
+ receiverFut.cancel(true);
+ // The receiver thread pool is not reused, so it should be closed here.
+ receiverThreadPool.shutdown();
+ }
+
+ /**
+ * Remove request/response Ignite queues.
+ */
+ private void removeQueues() {
+ reqQueue.close();
+ resQueue.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ boolean runningBefore = running.getAndSet(false);
+
+ if (runningBefore) {
+ stopService();
+ stopReceiver();
+ removeQueues();
+ }
+ }
+ }
+
+ /**
+ * Apache Ignite service that makes inference reading requests from the request queue and writing responses to the
+ * response queue. This service is assumed to be deployed in {@link #build(InfModelReader, InfModelParser)} method
+ * and cancelled in {@link InfModel#close()} method of the inference model.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+ private static class IgniteDistributedInfModelService<I extends Serializable, O extends Serializable>
+ implements Service {
+ /** */
+ private static final long serialVersionUID = -3596084917874395597L;
+
+ /** Inference model reader. */
+ private final InfModelReader reader;
+
+ /** Inference model parser. */
+ private final InfModelParser<I, O> parser;
+
+ /** Suffix that with correspondent templates formats service and queue names. */
+ private final String suffix;
+
+ /** Request queue, is created in {@link #init(ServiceContext)} method. */
+ private transient IgniteQueue<I> reqQueue;
+
+ /** Response queue, is created in {@link #init(ServiceContext)} method. */
+ private transient IgniteQueue<O> resQueue;
+
+ /** Inference model, is created in {@link #init(ServiceContext)} method. */
+ private transient InfModel<I, O> mdl;
+
+ /**
+ * Constructs a new instance of Ignite distributed inference model service.
+ *
+ * @param reader Inference model reader.
+ * @param parser Inference model parser.
+ * @param suffix Suffix that with correspondent templates formats service and queue names.
+ */
+ IgniteDistributedInfModelService(InfModelReader reader, InfModelParser<I, O> parser, String suffix) {
+ this.reader = reader;
+ this.parser = parser;
+ this.suffix = suffix;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(ServiceContext ctx) {
+ Ignite ignite = Ignition.localIgnite();
+
+ reqQueue = ignite.queue(String.format(INFERENCE_REQUEST_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY,
+ queueCfg);
+ resQueue = ignite.queue(String.format(INFERENCE_RESPONSE_QUEUE_NAME_PATTERN, suffix), QUEUE_CAPACITY,
+ queueCfg);
+
+ mdl = parser.parse(reader.read());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) {
+ while (!ctx.isCancelled()) {
+ I req;
+ try {
+ req = reqQueue.take();
+ }
+ catch (IllegalStateException e) {
+ // If the queue is removed during the take() operation exception should be ignored.
+ if (!reqQueue.removed())
+ throw e;
+ continue;
+ }
+
+ O res = mdl.predict(req);
+
+ try {
+ resQueue.put(res);
+ }
+ catch (IllegalStateException e) {
+ // If the queue is removed during the put() operation exception should be ignored.
+ if (!resQueue.removed())
+ throw e;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ // Do nothing. Queues are assumed to be closed in model close() method.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleInfModelBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleInfModelBuilder.java
new file mode 100644
index 0000000..f756f45
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SingleInfModelBuilder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.builder;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+
+/**
+ * Implementation of synchronous inference model builder that builds a model processed locally in a single thread.
+ */
+public class SingleInfModelBuilder implements SyncInfModelBuilder {
+ /** {@inheritDoc} */
+ @Override public <I extends Serializable, O extends Serializable> InfModel<I, O> build(InfModelReader reader,
+ InfModelParser<I, O> parser) {
+ return parser.parse(reader.read());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncInfModelBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncInfModelBuilder.java
new file mode 100644
index 0000000..7aed8b8
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/SyncInfModelBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.builder;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+
+/**
+ * Builder of synchronous inference model. Uses specified model reader (see {@link InfModelReader}) and mode parser (see
+ * {@link InfModelParser}) to build a model.
+ */
+@FunctionalInterface
+public interface SyncInfModelBuilder {
+ /**
+ * Builds synchronous inference model using specified model reader and model parser.
+ *
+ * @param reader Model reader.
+ * @param parser Model parser.
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ * @return Inference model.
+ */
+ public <I extends Serializable, O extends Serializable> InfModel<I, O> build(InfModelReader reader,
+ InfModelParser<I, O> parser);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedInfModelBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedInfModelBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedInfModelBuilder.java
new file mode 100644
index 0000000..ff538de
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/ThreadedInfModelBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.builder;
+
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.inference.parser.InfModelParser;
+import org.apache.ignite.ml.inference.reader.InfModelReader;
+
+/**
+ * Implementation of asynchronous inference model builder that builds model processed locally utilizing specified number
+ * of threads.
+ */
+public class ThreadedInfModelBuilder implements AsyncInfModelBuilder {
+ /** Number of threads to be utilized for model inference. */
+ private final int threads;
+
+ /**
+ * Constructs a new instance of threaded inference model builder.
+ *
+ * @param threads Number of threads to be utilized for model inference.
+ */
+ public ThreadedInfModelBuilder(int threads) {
+ this.threads = threads;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <I extends Serializable, O extends Serializable> InfModel<I, Future<O>> build(
+ InfModelReader reader, InfModelParser<I, O> parser) {
+ return new ThreadedInfModel<>(parser.parse(reader.read()), threads);
+ }
+
+ /**
+ * Threaded inference model that performs inference in multiply threads.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+ private static class ThreadedInfModel<I extends Serializable, O extends Serializable>
+ implements InfModel<I, Future<O>> {
+ /** Inference model. */
+ private final InfModel<I, O> mdl;
+
+ /** Thread pool. */
+ private final ExecutorService threadPool;
+
+ /**
+ * Constructs a new instance of threaded inference model.
+ *
+ * @param mdl Inference model.
+ * @param threads Thread pool.
+ */
+ ThreadedInfModel(InfModel<I, O> mdl, int threads) {
+ this.mdl = mdl;
+ this.threadPool = Executors.newFixedThreadPool(threads);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Future<O> predict(I input) {
+ return threadPool.submit(() -> mdl.predict(input));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ threadPool.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/package-info.java
new file mode 100644
index 0000000..bed2e70
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/builder/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Root package for model inference builders.
+ */
+package org.apache.ignite.ml.inference.builder;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/package-info.java
new file mode 100644
index 0000000..f2ce68c
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Root package for model inference functionality.
+ */
+package org.apache.ignite.ml.inference;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteFunctionInfModelParser.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteFunctionInfModelParser.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteFunctionInfModelParser.java
new file mode 100644
index 0000000..a4f1377
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/IgniteFunctionInfModelParser.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.parser;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import org.apache.ignite.ml.inference.InfModel;
+import org.apache.ignite.ml.math.functions.IgniteFunction;
+
+/**
+ * Implementation of model parser that accepts serialized {@link IgniteFunction}.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+public class IgniteFunctionInfModelParser<I, O> implements InfModelParser<I, O> {
+ /** */
+ private static final long serialVersionUID = -4624683614990816434L;
+
+ /** {@inheritDoc} */
+ @Override public InfModel<I, O> parse(byte[] mdl) {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(mdl);
+ ObjectInputStream ois = new ObjectInputStream(bais)) {
+ @SuppressWarnings("unchecked")
+ IgniteFunction<I, O> function = (IgniteFunction<I, O>)ois.readObject();
+
+ return new IgniteFunctionInfoModel(function);
+ }
+ catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Inference model that wraps {@link IgniteFunction}.
+ */
+ private class IgniteFunctionInfoModel implements InfModel<I, O> {
+ /** Ignite function. */
+ private final IgniteFunction<I, O> function;
+
+ /**
+ * Constructs a new instance of Ignite function.
+ *
+ * @param function Ignite function.
+ */
+ IgniteFunctionInfoModel(IgniteFunction<I, O> function) {
+ this.function = function;
+ }
+
+ /** {@inheritDoc} */
+ @Override public O predict(I input) {
+ return function.apply(input);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // Do nothing.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/InfModelParser.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/InfModelParser.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/InfModelParser.java
new file mode 100644
index 0000000..fa62558
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/InfModelParser.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.parser;
+
+import java.io.Serializable;
+import org.apache.ignite.ml.inference.InfModel;
+
+/**
+ * Model parser that accepts a serialized model represented by byte array, parses it and returns {@link InfModel}.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+@FunctionalInterface
+public interface InfModelParser<I, O> extends Serializable {
+ /**
+ * Accepts serialized model represented by byte array, parses it and returns {@link InfModel}.
+ *
+ * @param mdl Serialized model represented by byte array.
+ * @return Inference model.
+ */
+ public InfModel<I, O> parse(byte[] mdl);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowBaseInfModelParser.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowBaseInfModelParser.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowBaseInfModelParser.java
new file mode 100644
index 0000000..acc521f
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowBaseInfModelParser.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.parser;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.ml.inference.InfModel;
+import org.tensorflow.Session;
+import org.tensorflow.Tensor;
+
+/**
+ * Base class for TensorFlow model parsers. Contains the logic that is common for models saved as "SavedModel" and as a
+ * simple graph.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+public abstract class TensorFlowBaseInfModelParser<I, O> implements InfModelParser<I, O> {
+ /** */
+ private static final long serialVersionUID = 5574259553625871456L;
+
+ /** Map of input graph nodes (placeholders) and transformers that allow to transform input into tensor. */
+ private final Map<String, InputTransformer<I>> inputs = new HashMap<>();
+
+ /** List of output graph nodes. */
+ private List<String> outputNames;
+
+ /** Transformer that allows to transform tensors into output. */
+ private OutputTransformer<O> outputTransformer;
+
+ /** {@inheritDoc} */
+ @Override public InfModel<I, O> parse(byte[] mdl) {
+ return new TensorFlowInfModel(parseModel(mdl));
+ }
+
+ /**
+ * Parses model specified in serialized form as byte array.
+ *
+ * @param mdl Inference model in serialized form as byte array.
+ * @return TensorFlow session that encapsulates the TensorFlow graph parsed from serialized model.
+ */
+ public abstract Session parseModel(byte[] mdl);
+
+ /**
+ * Setter that allows to specify additional input graph node and correspondent transformer that allows to transform
+ * input into tensor.
+ *
+ * @param name Name of the input graph node.
+ * @param transformer Transformer that allows to transform input into tensor.
+ * @return This instance.
+ */
+ public TensorFlowBaseInfModelParser<I, O> withInput(String name, InputTransformer<I> transformer) {
+ if (inputs.containsKey(name))
+ throw new IllegalArgumentException("Inputs already contains specified name [name=" + name + "]");
+
+ inputs.put(name, transformer);
+
+ return this;
+ }
+
+ /**
+ * Setter that allows to specify output graph nodes and correspondent transformer that allow to transform tensors
+ * into output.
+ *
+ * @param names List of output graph node names.
+ * @param transformer Transformer that allow to transform tensors into output.
+ * @return This instance.
+ */
+ public TensorFlowBaseInfModelParser<I, O> withOutput(List<String> names, OutputTransformer<O> transformer) {
+ if (outputNames != null || outputTransformer != null)
+ throw new IllegalArgumentException("Outputs already specified");
+
+ outputNames = names;
+ outputTransformer = transformer;
+
+ return this;
+ }
+
+ /**
+ * Input transformer that accepts input and transforms it into tensor.
+ *
+ * @param <I> Type of model input.
+ */
+ @FunctionalInterface
+ public interface InputTransformer<I> extends Serializable {
+ /**
+ * Transforms input into tensor.
+ *
+ * @param input Input data.
+ * @return Tensor (transformed input data).
+ */
+ public Tensor<?> transform(I input);
+ }
+
+ /**
+ * Output transformer that accepts tensors and transforms them into output.
+ *
+ * @param <O> Type of model output.
+ */
+ @FunctionalInterface
+ public interface OutputTransformer<O> extends Serializable {
+ /**
+ * Transforms tensors into output.
+ *
+ * @param output Tensors.
+ * @return Output (transformed tensors).
+ */
+ public O transform(Map<String, Tensor<?>> output);
+ }
+
+ /**
+ * TensorFlow inference model based on pre-loaded graph and created session.
+ */
+ private class TensorFlowInfModel implements InfModel<I, O> {
+ /** TensorFlow session. */
+ private final Session ses;
+
+ /**
+ * Constructs a new instance of TensorFlow inference model.
+ *
+ * @param ses TensorFlow session.
+ */
+ TensorFlowInfModel(Session ses) {
+ this.ses = ses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public O predict(I input) {
+ Session.Runner runner = ses.runner();
+
+ runner = feedAll(runner, input);
+ runner = fetchAll(runner);
+
+ List<Tensor<?>> prediction = runner.run();
+ Map<String, Tensor<?>> collectedPredictionTensors = indexTensors(prediction);
+
+ return outputTransformer.transform(collectedPredictionTensors);
+ }
+
+ /**
+ * Feeds input into graphs input nodes using input transformers (see {@link #inputs}).
+ *
+ * @param runner TensorFlow session runner.
+ * @param input Input.
+ * @return TensorFlow session runner.
+ */
+ private Session.Runner feedAll(Session.Runner runner, I input) {
+ for (Map.Entry<String, InputTransformer<I>> e : inputs.entrySet()) {
+ String opName = e.getKey();
+ InputTransformer<I> transformer = e.getValue();
+
+ runner = runner.feed(opName, transformer.transform(input));
+ }
+
+ return runner;
+ }
+
+ /**
+ * Specifies graph output nodes to be fetched using {@link #outputNames}.
+ *
+ * @param runner TensorFlow session runner.
+ * @return TensorFlow session runner.
+ */
+ private Session.Runner fetchAll(Session.Runner runner) {
+ for (String e : outputNames)
+ runner.fetch(e);
+
+ return runner;
+ }
+
+ /**
+ * Indexes tensors fetched from graph using {@link #outputNames}.
+ *
+ * @param tensors List of fetched tensors.
+ * @return Map of tensor name as a key and tensor as a value.
+ */
+ private Map<String, Tensor<?>> indexTensors(List<Tensor<?>> tensors) {
+ Map<String, Tensor<?>> collectedTensors = new HashMap<>();
+
+ Iterator<String> outputNamesIter = outputNames.iterator();
+ Iterator<Tensor<?>> tensorsIter = tensors.iterator();
+
+ while (outputNamesIter.hasNext() && tensorsIter.hasNext())
+ collectedTensors.put(outputNamesIter.next(), tensorsIter.next());
+
+ // We expect that output names and output tensors have the same size.
+ if (outputNamesIter.hasNext() || tensorsIter.hasNext())
+ throw new IllegalStateException("Outputs are incorrect");
+
+ return collectedTensors;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ ses.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowGraphInfModelParser.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowGraphInfModelParser.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowGraphInfModelParser.java
new file mode 100644
index 0000000..7c547ae
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowGraphInfModelParser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.parser;
+
+import org.tensorflow.Graph;
+import org.tensorflow.Session;
+
+/**
+ * Implementation of TensorFlow model parser that accepts serialized graph definition.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+public class TensorFlowGraphInfModelParser<I, O> extends TensorFlowBaseInfModelParser<I, O> {
+ /** */
+ private static final long serialVersionUID = -1872566748640565856L;
+
+ /** {@inheritDoc} */
+ @Override public Session parseModel(byte[] mdl) {
+ Graph graph = new Graph();
+ graph.importGraphDef(mdl);
+
+ return new Session(graph);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowSavedModelInfModelParser.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowSavedModelInfModelParser.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowSavedModelInfModelParser.java
new file mode 100644
index 0000000..2ee9f11
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/TensorFlowSavedModelInfModelParser.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.inference.parser;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.ml.inference.util.DirectorySerializer;
+import org.tensorflow.SavedModelBundle;
+import org.tensorflow.Session;
+
+/**
+ * Implementation of TensorFlow model parser that accepts serialized directory with "SavedModel" as an input. The
+ * directory is assumed to be serialized by {@link DirectorySerializer}.
+ *
+ * @param <I> Type of model input.
+ * @param <O> Type of model output.
+ */
+public class TensorFlowSavedModelInfModelParser<I, O> extends TensorFlowBaseInfModelParser<I, O> {
+ /** */
+ private static final long serialVersionUID = 5638083440240281879L;
+
+ /** Prefix to be used to create temporary directory for TensorFlow model files. */
+ private static final String TMP_DIR_PREFIX = "tensorflow_saved_model_";
+
+ /** Model tags. */
+ private final String[] tags;
+
+ /**
+ * Constructs a new instance of TensorFlow model parser.
+ *
+ * @param tags Model tags.
+ */
+ public TensorFlowSavedModelInfModelParser(String... tags) {
+ this.tags = tags;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Session parseModel(byte[] mdl) {
+ Path dir = null;
+ try {
+ dir = Files.createTempDirectory(TMP_DIR_PREFIX);
+ DirectorySerializer.deserialize(dir.toAbsolutePath(), mdl);
+ SavedModelBundle bundle = SavedModelBundle.load(dir.toString(), tags);
+ return bundle.session();
+ }
+ catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ if (dir != null)
+ DirectorySerializer.deleteDirectory(dir);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec118646/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/package-info.java
----------------------------------------------------------------------
diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/package-info.java
new file mode 100644
index 0000000..ce8c27b
--- /dev/null
+++ b/modules/ml/src/main/java/org/apache/ignite/ml/inference/parser/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Root package for model inference parsers.
+ */
+package org.apache.ignite.ml.inference.parser;
\ No newline at end of file