You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2018/08/03 09:58:31 UTC
[18/50] [abbrv] ignite git commit: IGNITE-9034: [ML] Add Estimator
API support to TensorFlow cluster on top of Apache Ignite.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
new file mode 100644
index 0000000..3dcd5f8
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/cluster/util/TensorFlowUserScriptRunner.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.cluster.util;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive;
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowClusterSpec;
+import org.apache.ignite.tensorflow.cluster.spec.TensorFlowServerAddressSpec;
+import org.apache.ignite.tensorflow.core.util.AsyncNativeProcessRunner;
+import org.apache.ignite.tensorflow.core.util.NativeProcessRunner;
+
+/**
+ * Utils class that helps to start and stop user script process.
+ */
+public class TensorFlowUserScriptRunner extends AsyncNativeProcessRunner {
+ /** Ignite logger. */
+ private final IgniteLogger log;
+
+ /** Job archive that will be extracted and used as working directory for the native process. */
+ private final TensorFlowJobArchive jobArchive;
+
+ /** TensorFlow cluster specification. */
+ private final TensorFlowClusterSpec clusterSpec;
+
+ /** Output stream data consumer. */
+ private final Consumer<String> out;
+
+ /** Error stream data consumer. */
+ private final Consumer<String> err;
+
+ /** Working directory of the user script process. */
+ private File workingDir;
+
+ /**
+ * Constructs a new instance of TensorFlow user script runner.
+ *
+ * @param ignite Ignite instance.
+ * @param executor Executor to be used in {@link AsyncNativeProcessRunner}.
+ * @param jobArchive Job archive that will be extracted and used as working directory for the native process.
+ * @param clusterSpec TensorFlow cluster specification.
+ * @param out Output stream data consumer.
+ * @param err Error stream data consumer.
+ */
+ public TensorFlowUserScriptRunner(Ignite ignite, ExecutorService executor, TensorFlowJobArchive jobArchive,
+ TensorFlowClusterSpec clusterSpec, Consumer<String> out, Consumer<String> err) {
+ super(ignite, executor);
+
+ this.log = ignite.log().getLogger(TensorFlowUserScriptRunner.class);
+
+ this.jobArchive = jobArchive;
+ this.clusterSpec = clusterSpec;
+ this.out = out;
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public NativeProcessRunner doBefore() {
+ try {
+ workingDir = Files.createTempDirectory("tf_us_").toFile();
+ log.debug("Directory has been created [path=" + workingDir.getAbsolutePath() + "]");
+
+ unzip(jobArchive.getData(), workingDir);
+ log.debug("Job archive has been extracted [path=" + workingDir.getAbsolutePath() + "]");
+
+ return prepareNativeProcessRunner();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void doAfter() {
+ if (workingDir != null) {
+ delete(workingDir);
+ log.debug("Directory has been deleted [path=" + workingDir.getAbsolutePath() + "]");
+ }
+ }
+
+ /**
+ * Prepares process builder and specifies working directory and command to be run.
+ *
+ * @return Prepared process builder.
+ */
+ private NativeProcessRunner prepareNativeProcessRunner() {
+ if (workingDir == null)
+ throw new IllegalStateException("Working directory is not created");
+
+ ProcessBuilder procBuilder = new ProcessBuilder();
+
+ procBuilder.directory(workingDir);
+ procBuilder.command(jobArchive.getCommands());
+
+ Map<String, String> env = procBuilder.environment();
+ env.put("PYTHONPATH", workingDir.getAbsolutePath());
+ env.put("TF_CONFIG", formatTfConfigVar());
+ env.put("TF_WORKERS", formatTfWorkersVar());
+ env.put("TF_CHIEF_SERVER", formatTfChiefServerVar());
+
+ return new NativeProcessRunner(procBuilder, null, out, err);
+ }
+
+ /**
+ * Formats "TF_CONFIG" variable to be passed into user script.
+ *
+ * @return Formatted "TF_CONFIG" variable to be passed into user script.
+ */
+ private String formatTfConfigVar() {
+ return new StringBuilder()
+ .append("{\"cluster\" : ")
+ .append(clusterSpec.format(Ignition.ignite()))
+ .append(", ")
+ .append("\"task\": {\"type\" : \"" + TensorFlowClusterResolver.CHIEF_JOB_NAME + "\", \"index\": 0}}")
+ .toString();
+ }
+
+ /**
+ * Formats "TF_WORKERS" variable to be passed into user script.
+ *
+ * @return Formatted "TF_WORKERS" variable to be passed into user script.
+ */
+ private String formatTfWorkersVar() {
+ StringJoiner joiner = new StringJoiner(", ");
+
+ int cnt = clusterSpec.getJobs().get(TensorFlowClusterResolver.WORKER_JOB_NAME).size();
+ for (int i = 0; i < cnt; i++)
+ joiner.add("\"/job:" + TensorFlowClusterResolver.WORKER_JOB_NAME + "/task:" + i + "\"");
+
+ return "[" + joiner + "]";
+ }
+
+ /**
+ * Formats "TF_CHIEF_SERVER" variable to be passed into user script.
+ *
+ * @return Formatted "TF_CHIEF_SERVER" variable to be passed into user script.
+ */
+ private String formatTfChiefServerVar() {
+ List<TensorFlowServerAddressSpec> tasks = clusterSpec.getJobs().get(TensorFlowClusterResolver.CHIEF_JOB_NAME);
+
+ if (tasks == null || tasks.size() != 1)
+ throw new IllegalStateException("TensorFlow cluster specification should contain exactly one chief task");
+
+ TensorFlowServerAddressSpec addrSpec = tasks.iterator().next();
+
+ return "grpc://" + addrSpec.format(Ignition.ignite());
+ }
+
+ /**
+ * Clears given file or directory recursively.
+ *
+ * @param file File or directory to be cleaned,
+ */
+ private void delete(File file) {
+ if (file.isDirectory()) {
+ String[] files = file.list();
+
+ if (files != null && files.length != 0)
+ for (String fileToBeDeleted : files)
+ delete(new File(file, fileToBeDeleted));
+
+ if (!file.delete())
+ throw new IllegalStateException("Can't delete directory [path=" + file.getAbsolutePath() + "]");
+ }
+ else {
+ if (!file.delete())
+ throw new IllegalStateException("Can't delete file [path=" + file.getAbsolutePath() + "]");
+ }
+ }
+
+ /**
+ * Extracts specified zip archive into specified directory.
+ *
+ * @param data Zip archive to be extracted.
+ * @param extractTo Target directory.
+ */
+ private void unzip(byte[] data, File extractTo) {
+ try (ZipInputStream zipStream = new ZipInputStream(new ByteArrayInputStream(data))) {
+ ZipEntry entry;
+ while ((entry = zipStream.getNextEntry()) != null) {
+ File file = new File(extractTo, entry.getName());
+
+ if (entry.isDirectory() && !file.exists()) {
+ boolean created = file.mkdirs();
+ if (!created)
+ throw new IllegalStateException("Can't create directory [path=" + file.getAbsolutePath() + "]");
+ }
+ else {
+ if (!file.getParentFile().exists()) {
+ boolean created = file.getParentFile().mkdirs();
+ if (!created)
+ throw new IllegalStateException("Can't create directory [path=" +
+ file.getParentFile().getAbsolutePath() + "]");
+ }
+
+ try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file))) {
+ IOUtils.copy(zipStream, out);
+ }
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java
index 0ef81bc..c825448 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManager.java
@@ -17,18 +17,17 @@
package org.apache.ignite.tensorflow.core;
-import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
-import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
/**
* Process manager that allows to run and maintain processes in the cluster.
*
* @param <R> Type of task to be run.
*/
-public interface ProcessManager<R> extends Serializable {
+public interface ProcessManager<R> {
/**
* Starts the processes by the given specifications.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java
index b66b54f..4f10e83 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/ProcessManagerWrapper.java
@@ -17,11 +17,11 @@
package org.apache.ignite.tensorflow.core;
-import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
/**
* Process manager wrapper that allows to define how one type of process specification should be transformed into
@@ -31,9 +31,6 @@ import java.util.UUID;
* @param <R> Type of accepted process specifications.
*/
public abstract class ProcessManagerWrapper<T, R> implements ProcessManager<R> {
- /** */
- private static final long serialVersionUID = -6397225095261457524L;
-
/** Delegate. */
private final ProcessManager<T> delegate;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java
index 027ece3..a25ff97 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.tensorflow.core.longrunning;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -25,7 +24,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterGroupEmptyException;
@@ -41,22 +39,18 @@ import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProces
* Long running process manager that allows to start, stop and make other actions with long running processes.
*/
public class LongRunningProcessManager implements ProcessManager<LongRunningProcess> {
- /** */
- private static final long serialVersionUID = 1151455641358063287L;
-
- /** Ignite instance supplier. */
- private final Supplier<Ignite> igniteSupplier;
+ /** Ignite instance. */
+ private final Ignite ignite;
/**
* Constructs a new instance of long running process manager.
*
- * @param igniteSupplier Ignite instance supplier.
- * @param <T> Type of serializable supplier.
+ * @param ignite Ignite instance.
*/
- public <T extends Supplier<Ignite> & Serializable> LongRunningProcessManager(T igniteSupplier) {
- assert igniteSupplier != null : "Ignite supplier should not be null";
+ public LongRunningProcessManager(Ignite ignite) {
+ assert ignite != null : "Ignite instance should not be null";
- this.igniteSupplier = igniteSupplier;
+ this.ignite = ignite;
}
/** {@inheritDoc} */
@@ -100,7 +94,6 @@ public class LongRunningProcessManager implements ProcessManager<LongRunningProc
List<T> nodeProcesses = params.get(nodeId);
LongRunningProcessTask<List<E>> task = taskSupplier.apply(nodeProcesses);
- Ignite ignite = igniteSupplier.get();
ClusterGroup clusterGrp = ignite.cluster().forNodeId(nodeId);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java
index 1d08519..04f90d3 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/longrunning/task/LongRunningProcessStartTask.java
@@ -17,13 +17,13 @@
package org.apache.ignite.tensorflow.core.longrunning.task;
-import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.ignite.tensorflow.core.longrunning.LongRunningProcess;
import org.apache.ignite.tensorflow.core.util.CustomizableThreadFactory;
/**
@@ -78,7 +78,7 @@ public class LongRunningProcessStartTask extends LongRunningProcessTask<List<UUI
*/
private Future<?> runTask(Runnable task) {
return Executors
- .newSingleThreadExecutor(new CustomizableThreadFactory("LONG_RUNNING_PROCESS_TASK", true))
+ .newSingleThreadExecutor(new CustomizableThreadFactory("tf-long-running", true))
.submit(task);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java
index df36ba9..2ad3c9d 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcess.java
@@ -20,6 +20,7 @@ package org.apache.ignite.tensorflow.core.nativerunning;
import java.io.Serializable;
import java.util.UUID;
import java.util.function.Supplier;
+import org.apache.ignite.tensorflow.util.SerializableSupplier;
/**
* Native process specification.
@@ -29,7 +30,7 @@ public class NativeProcess implements Serializable {
private static final long serialVersionUID = -7056800139746134956L;
/** Process builder supplier. */
- private final Supplier<ProcessBuilder> procBuilderSupplier;
+ private final SerializableSupplier<ProcessBuilder> procBuilderSupplier;
/** Stdin of the process. */
private final String stdin;
@@ -44,8 +45,7 @@ public class NativeProcess implements Serializable {
* @param stdin Stdin of the process.
* @param nodeId Node identifier.
*/
- public <T extends Supplier<ProcessBuilder> & Serializable> NativeProcess(T procBuilderSupplier, String stdin,
- UUID nodeId) {
+ public NativeProcess(SerializableSupplier<ProcessBuilder> procBuilderSupplier, String stdin, UUID nodeId) {
assert procBuilderSupplier != null : "Process builder supplier should not be null";
assert nodeId != null : "Node identifier should not be null";
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java
index 60cd89b..5accf3d 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/NativeProcessManager.java
@@ -17,8 +17,6 @@
package org.apache.ignite.tensorflow.core.nativerunning;
-import java.io.Serializable;
-import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.tensorflow.core.ProcessManager;
import org.apache.ignite.tensorflow.core.ProcessManagerWrapper;
@@ -30,17 +28,13 @@ import org.apache.ignite.tensorflow.core.nativerunning.task.NativeProcessStartTa
* Native process manager that allows to start, stop and make other actions with native processes.
*/
public class NativeProcessManager extends ProcessManagerWrapper<LongRunningProcess, NativeProcess> {
- /** */
- private static final long serialVersionUID = 718119807915504045L;
-
/**
* Constructs a new native process manager.
*
- * @param igniteSupplier Ignite instance supplier.
- * @param <T> Type of serializable supplier.
+ * @param ignite Ignite instance.
*/
- public <T extends Supplier<Ignite> & Serializable> NativeProcessManager(T igniteSupplier) {
- super(new LongRunningProcessManager(igniteSupplier));
+ public NativeProcessManager(Ignite ignite) {
+ super(new LongRunningProcessManager(ignite));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java
index 8fc28a5..ae9e2b9 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/nativerunning/task/NativeProcessStartTask.java
@@ -18,16 +18,11 @@
package org.apache.ignite.tensorflow.core.nativerunning.task;
import java.util.function.Supplier;
-import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.Scanner;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.tensorflow.core.util.CustomizableThreadFactory;
+import org.apache.ignite.tensorflow.core.nativerunning.NativeProcess;
+import org.apache.ignite.tensorflow.core.util.NativeProcessRunner;
/**
* Task that starts native process by its specification.
@@ -55,62 +50,26 @@ public class NativeProcessStartTask implements IgniteRunnable {
Supplier<ProcessBuilder> procBuilderSupplier = procSpec.getProcBuilderSupplier();
ProcessBuilder procBuilder = procBuilderSupplier.get();
- Process proc;
- try {
- proc = procBuilder.start();
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- Thread shutdownHook = new Thread(proc::destroy);
- Runtime.getRuntime().addShutdownHook(shutdownHook);
+ NativeProcessRunner procRunner = new NativeProcessRunner(
+ procBuilder,
+ procSpec.getStdin(),
+ System.out::println,
+ System.err::println
+ );
- Future<?> outForward = forwardStream(proc.getInputStream(), System.out);
- Future<?> errForward = forwardStream(proc.getErrorStream(), System.err);
+ IgniteLogger log = Ignition.ignite().log().getLogger(NativeProcessStartTask.class);
try {
- if (procSpec.getStdin() != null) {
- PrintWriter writer = new PrintWriter(proc.getOutputStream());
- writer.println(procSpec.getStdin());
- writer.flush();
- }
-
- int status;
- try {
- status = proc.waitFor();
- }
- catch (InterruptedException e) {
- proc.destroy();
- status = proc.exitValue();
- }
-
- Runtime.getRuntime().removeShutdownHook(shutdownHook);
-
- if (status != 0)
- throw new IllegalStateException("Native process exit status is " + status);
+ log.debug("Starting native process");
+ procRunner.startAndWait();
+ log.debug("Native process completed");
}
- finally {
- outForward.cancel(true);
- errForward.cancel(true);
+ catch (InterruptedException e) {
+ log.debug("Native process interrupted");
+ }
+ catch (Exception e) {
+ log.error("Native process failed", e);
+ throw e;
}
- }
-
- /**
- * Forwards stream.
- *
- * @param src Source stream.
- * @param dst Destination stream.
- * @return Future that allows to interrupt forwarding.
- */
- private Future<?> forwardStream(InputStream src, PrintStream dst) {
- return Executors
- .newSingleThreadExecutor(new CustomizableThreadFactory("NATIVE_PROCESS_FORWARD_STREAM", true))
- .submit(() -> {
- Scanner scanner = new Scanner(src);
-
- while (!Thread.currentThread().isInterrupted() && scanner.hasNextLine())
- dst.println(scanner.nextLine());
- });
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
new file mode 100644
index 0000000..e59ab00
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessBuilderSupplier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.core.pythonrunning;
+
+import org.apache.ignite.tensorflow.util.SerializableSupplier;
+
+/**
+ * Python process builder supplier that is used to create Python process builder.
+ */
+public class PythonProcessBuilderSupplier implements SerializableSupplier<ProcessBuilder> {
+ /** */
+ private static final long serialVersionUID = 7181937306294456125L;
+
+ /** Python environment variable name. */
+ private static final String PYTHON_ENV_NAME = "PYTHON";
+
+ /** Interactive flag (allows to used standard input to pass Python script). */
+ private final boolean interactive;
+
+ /**
+ * Constructs a new instance of Python process builder supplier.
+ *
+ * @param interactive Interactive flag (allows to used standard input to pass Python script).
+ */
+ public PythonProcessBuilderSupplier(boolean interactive) {
+ this.interactive = interactive;
+ }
+
+ /**
+ * Returns process builder to be used to start Python process.
+ *
+ * @return Process builder to be used to start Python process.
+ */
+ public ProcessBuilder get() {
+ String python = System.getenv(PYTHON_ENV_NAME);
+
+ if (python == null)
+ python = "python3";
+
+ return interactive ? new ProcessBuilder(python, "-i") : new ProcessBuilder(python);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
index de35ff9..1f6c11e 100644
--- a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/pythonrunning/PythonProcessManager.java
@@ -17,8 +17,6 @@
package org.apache.ignite.tensorflow.core.pythonrunning;
-import java.io.Serializable;
-import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.tensorflow.core.ProcessManager;
import org.apache.ignite.tensorflow.core.ProcessManagerWrapper;
@@ -29,17 +27,13 @@ import org.apache.ignite.tensorflow.core.nativerunning.NativeProcessManager;
* Python process manager that allows to start, stop and make other actions with python processes.
*/
public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, PythonProcess> {
- /** */
- private static final long serialVersionUID = -7095409565854538504L;
-
/**
* Constructs a new instance of python process manager.
*
- * @param igniteSupplier Ignite instance supplier.
- * @param <T> Type of serializable supplier.
+ * @param ignite Ignite instance.
*/
- public <T extends Supplier<Ignite> & Serializable> PythonProcessManager(T igniteSupplier) {
- this(new NativeProcessManager(igniteSupplier));
+ public PythonProcessManager(Ignite ignite) {
+ this(new NativeProcessManager(ignite));
}
/**
@@ -54,30 +48,9 @@ public class PythonProcessManager extends ProcessManagerWrapper<NativeProcess, P
/** {@inheritDoc} */
@Override protected NativeProcess transformSpecification(PythonProcess spec) {
return new NativeProcess(
- new PythonProcessBuilderSupplier(),
+ new PythonProcessBuilderSupplier(true),
spec.getStdin(),
spec.getNodeId()
);
}
-
- /**
- * Python process builder supplier that is used to create Python process builder.
- */
- private static class PythonProcessBuilderSupplier implements Supplier<ProcessBuilder>, Serializable {
- /** */
- private static final long serialVersionUID = 8497087649461965914L;
-
- /** Python environment variable name. */
- private static final String PYTHON_ENV_NAME = "PYTHON";
-
- /** {@inheritDoc} */
- @Override public ProcessBuilder get() {
- String python = System.getenv(PYTHON_ENV_NAME);
-
- if (python == null)
- python = "python3";
-
- return new ProcessBuilder(python, "-i");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java
new file mode 100644
index 0000000..b336b97
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/AsyncNativeProcessRunner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.core.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+
+/**
+ * Asynchronous native process runner.
+ */
+public abstract class AsyncNativeProcessRunner {
+ /** Ignite logger. */
+ private final IgniteLogger log;
+
+ /** Executors that is used to start async native process. */
+ private final ExecutorService executor;
+
+ /** Future of the async process process. */
+ private Future<?> fut;
+
+ /**
+ * Constructs a new asynchronous native process runner.
+ *
+ * @param ignite Ignite instance.
+ * @param executor Executor.
+ */
+ public AsyncNativeProcessRunner(Ignite ignite, ExecutorService executor) {
+ this.log = ignite.log().getLogger(AsyncNativeProcessRunner.class);
+ this.executor = executor;
+ }
+
+ /**
+ * Method that should be called before starting the process.
+ *
+ * @return Prepared native process runner.
+ */
+ public abstract NativeProcessRunner doBefore();
+
+ /**
+ * Method that should be called after starting the process.
+ */
+ public abstract void doAfter();
+
+ /**
+ * Starts the process in separate thread.
+ */
+ public synchronized void start() {
+ if (fut != null)
+ throw new IllegalStateException("Async native process has already been started");
+
+ NativeProcessRunner procRunner = doBefore();
+
+ fut = executor.submit(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ log.debug("Starting native process");
+ procRunner.startAndWait();
+ log.debug("Native process completed");
+ break;
+ }
+ catch (InterruptedException e) {
+ log.debug("Native process interrupted");
+ break;
+ }
+ catch (Exception e) {
+ log.error("Native process failed", e);
+ }
+ }
+
+ doAfter();
+ });
+ }
+
+ /**
+ * Stops the process.
+ */
+ public synchronized void stop() {
+ if (fut != null && !fut.isDone())
+ fut.cancel(true);
+ }
+
+ /**
+ * Checks if process is already completed.
+ *
+ * @return {@code true} if process completed, otherwise {@code false}.
+ */
+ public boolean isCompleted() {
+ return fut != null && fut.isDone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java
new file mode 100644
index 0000000..38af26d
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/core/util/NativeProcessRunner.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.core.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Scanner;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Utils class that helps to start native processes.
+ */
+public class NativeProcessRunner {
+ /** Thread name to be used by threads that forward streams. */
+ private static final String NATIVE_PROCESS_FORWARD_STREAM_THREAD_NAME = "tf-forward-native-output";
+
+ /** Process builder. */
+ private final ProcessBuilder procBuilder;
+
+ /** Standard input of the process. */
+ private final String stdin;
+
+ /** Output stream data consumer. */
+ private final Consumer<String> out;
+
+ /** Error stream data consumer. */
+ private final Consumer<String> err;
+
+ /**
+ * Constructs a new instance of native process runner.
+ *
+ * @param procBuilder Process builder.
+ * @param stdin Standard input of the process.
+ * @param out Output stream data consumer.
+ * @param err Error stream data consumer.
+ */
+ public NativeProcessRunner(ProcessBuilder procBuilder, String stdin, Consumer<String> out, Consumer<String> err) {
+ this.procBuilder = procBuilder;
+ this.stdin = stdin;
+ this.out = out;
+ this.err = err;
+ }
+
+ /**
+ * Starts the native process and waits it to be completed successfully or with exception.
+ */
+ public void startAndWait() throws InterruptedException {
+ Process proc;
+ try {
+ proc = procBuilder.start();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ AtomicBoolean shutdown = new AtomicBoolean();
+
+ Thread shutdownHook = new Thread(() -> {
+ shutdown.set(true);
+ proc.destroy();
+ });
+
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+ Future<?> outForward = forwardStream(proc.getInputStream(), out);
+ Future<?> errForward = forwardStream(proc.getErrorStream(), err);
+
+ try {
+ if (stdin != null) {
+ PrintWriter writer = new PrintWriter(proc.getOutputStream());
+ writer.println(stdin);
+ writer.flush();
+ }
+
+ int status;
+ try {
+ status = proc.waitFor();
+ }
+ catch (InterruptedException e) {
+ proc.destroy();
+ throw e;
+ }
+
+ if (!shutdown.get()) {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+
+ if (status != 0)
+ throw new IllegalStateException("Native process exit [status=" + status + "]");
+ }
+ }
+ finally {
+ outForward.cancel(true);
+ errForward.cancel(true);
+ }
+ }
+
+ /**
+ * Forwards stream.
+ *
+ * @param src Source stream.
+ * @param dst Destination stream.
+ * @return Future that allows to interrupt forwarding.
+ */
+ private Future<?> forwardStream(InputStream src, Consumer<String> dst) {
+ return Executors
+ .newSingleThreadExecutor(new CustomizableThreadFactory(NATIVE_PROCESS_FORWARD_STREAM_THREAD_NAME, true))
+ .submit(() -> {
+ Scanner scanner = new Scanner(src);
+
+ while (!Thread.currentThread().isInterrupted() && scanner.hasNextLine())
+ dst.accept(scanner.nextLine());
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java
new file mode 100644
index 0000000..0a7cae6
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/JobSubmitter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter;
+
+import org.apache.ignite.tensorflow.submitter.command.RootCommand;
+import picocli.CommandLine;
+
+/**
+ * Main class of the job submitter application that allows to submit TensorFlow jobs to be run within Ignite cluster.
+ */
+public class JobSubmitter {
+ /**
+ * Main method.
+ *
+ * @param args Arguments.
+ */
+ public static void main(String... args) {
+ CommandLine.run(new RootCommand(), System.out, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java
new file mode 100644
index 0000000..4d2fc18
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AbstractCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.logger.slf4j.Slf4jLogger;
+import picocli.CommandLine;
+
+/**
+ * Abstract command that contains options common for all commands.
+ */
+public abstract class AbstractCommand implements Runnable {
+ /** Ignite node configuration path. */
+ @CommandLine.Option(names = { "-c", "--config" }, description = "Apache Ignite client configuration.")
+ protected String cfg;
+
+ /**
+ * Returns Ignite instance based on configuration specified in {@link #cfg} field.
+ *
+ * @return Ignite instance.
+ */
+ protected Ignite getIgnite() {
+ if (cfg != null)
+ return Ignition.start(cfg);
+ else {
+ IgniteConfiguration igniteCfg = new IgniteConfiguration();
+ igniteCfg.setGridLogger(new Slf4jLogger());
+ igniteCfg.setClientMode(true);
+
+ return Ignition.start(igniteCfg);
+ }
+ }
+
+ /** */
+ public void setCfg(String cfg) {
+ this.cfg = cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java
new file mode 100644
index 0000000..946aa08
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/AttachCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager;
+import picocli.CommandLine;
+
+/**
+ * Command "attach" that is used to attach to running TensorFlow cluster and receive output of the user script.
+ */
+@CommandLine.Command(
+ name = "attach",
+ description = "Attaches to running TensorFlow cluster (user script process).",
+ mixinStandardHelpOptions = true
+)
+public class AttachCommand extends AbstractCommand {
+ /** TensorFlow cluster identifier. */
+ @CommandLine.Parameters(paramLabel = "CLUSTER_ID", description = "Cluster identifier.")
+ private UUID clusterId;
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try (Ignite ignite = getIgnite()) {
+ TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite);
+
+ mgr.listenToClusterUserScript(clusterId, System.out::println, System.err::println);
+ }
+ }
+
+ /** */
+ public void setClusterId(UUID clusterId) {
+ this.clusterId = clusterId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java
new file mode 100644
index 0000000..0538496
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/PsCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.tensorflow.cluster.TensorFlowCluster;
+import org.apache.ignite.tensorflow.cluster.TensorFlowClusterManager;
+import picocli.CommandLine;
+
+/**
+ * Command "ps" that is used to print identifiers of all running TensorFlow clusters.
+ */
+@CommandLine.Command(
+ name = "ps",
+ description = "Prints identifiers of all running TensorFlow clusters.",
+ mixinStandardHelpOptions = true
+)
+public class PsCommand extends AbstractCommand {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try (Ignite ignite = getIgnite()) {
+ TensorFlowClusterManager mgr = new TensorFlowClusterManager(ignite);
+
+ Map<UUID, TensorFlowCluster> clusters = mgr.getAllClusters();
+
+ for (UUID clusterId : clusters.keySet())
+ System.out.println(clusterId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java
new file mode 100644
index 0000000..508ea7b
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/RootCommand.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import picocli.CommandLine;
+
+/**
+ * Root command that aggregates all sub commands.
+ */
+@CommandLine.Command(
+ name = "ignite-tf",
+ description = "Apache Ignite and TensorFlow integration command line tool that allows to start, maintain and" +
+ " stop distributed deep learning utilizing Apache Ignite infrastructure and data.",
+ subcommands = {
+ StartCommand.class,
+ StopCommand.class,
+ AttachCommand.class,
+ PsCommand.class
+ },
+ mixinStandardHelpOptions = true
+)
+public class RootCommand extends AbstractCommand {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ CommandLine.usage(this, System.out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java
new file mode 100644
index 0000000..082b363
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StartCommand.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager;
+import org.apache.ignite.tensorflow.cluster.TensorFlowJobArchive;
+import picocli.CommandLine;
+
+/**
+ * Command "start" that is used to start a new TensorFlow cluster on top of Apache Ignite.
+ */
+@CommandLine.Command(
+ name = "start",
+ description = "Starts a new TensorFlow cluster and attaches to user script process.",
+ mixinStandardHelpOptions = true
+)
+public class StartCommand extends AbstractCommand {
+ /** Upstream cache name. */
+ @CommandLine.Parameters(index = "0", paramLabel = "CACHE_NAME", description = "Upstream cache name.")
+ private String cacheName;
+
+ /** Job folder or archive. */
+ @CommandLine.Parameters(index = "1", paramLabel = "JOB_DIR", description = "Job folder (or zip archive).")
+ private String jobFolder;
+
+ /** Job command to be executed in cluster. */
+ @CommandLine.Parameters(index = "2", paramLabel = "JOB_CMD", description = "Job command.")
+ private String jobCmd;
+
+ /** Arguments of a job command to be executed in cluster. */
+ @CommandLine.Parameters(index = "3..*", paramLabel = "JOB_ARGS", description = "Job arguments.")
+ private String[] jobArguments;
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try (Ignite ignite = getIgnite()) {
+ UUID clusterId = UUID.randomUUID();
+ String[] commands = new String[jobArguments.length + 1];
+ commands[0] = jobCmd;
+ System.arraycopy(jobArguments, 0, commands, 1, commands.length - 1);
+
+ TensorFlowJobArchive jobArchive = new TensorFlowJobArchive(
+ cacheName,
+ zip(jobFolder),
+ commands
+ );
+
+ TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite);
+ mgr.createCluster(clusterId, jobArchive);
+
+ mgr.listenToClusterUserScript(clusterId, System.out::println, System.err::println);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Archives specified folder or file into zip archive.
+ *
+ * @param jobArchivePath Path to folder to be archived.
+ * @return Byte array representing zip archive.
+ * @throws IOException In case of input/output exception.
+ */
+ private byte[] zip(String jobArchivePath) throws IOException {
+ Path path = Paths.get(jobArchivePath);
+ File file = path.toFile();
+
+ if (!file.exists())
+ throw new IllegalArgumentException("File doesn't exist [name=" + jobArchivePath + "]");
+
+ if (file.isDirectory())
+ return zipDirectory(file);
+ else if (jobArchivePath.endsWith(".zip"))
+ return zipArchive(file);
+ else
+ return zipFile(file);
+ }
+
+ /**
+ * Archives specified folder into zip archive.
+ *
+ * @param dir Directory to be archived.
+ * @return Byte array representing zip archive.
+ * @throws IOException In case of input/output exception.
+ */
+ private byte[] zipDirectory(File dir) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (ZipOutputStream zipFile = new ZipOutputStream(baos)) {
+ compressDirectoryToZip(dir.getAbsolutePath(), dir.getAbsolutePath(), zipFile);
+ }
+
+ return baos.toByteArray();
+ }
+
+ /**
+ * Archives specified file into zip archive.
+ *
+ * @param file File to be archived.
+ * @return Byte array representing zip archive.
+ * @throws IOException In case of input/output exception.
+ */
+ private byte[] zipFile(File file) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (ZipOutputStream zos = new ZipOutputStream(baos)) {
+ ZipEntry entry = new ZipEntry(file.getName());
+ zos.putNextEntry(entry);
+
+ try (FileInputStream in = new FileInputStream(file.getAbsolutePath())) {
+ IOUtils.copy(in, zos);
+ }
+ }
+
+ return baos.toByteArray();
+ }
+
+ /**
+ * Reads zip archive into byte array and returns this array.
+ *
+ * @param file Archive to be read.
+ * @return Byte array representing zip archive.
+ * @throws IOException In case of input/output exception.
+ */
+ private byte[] zipArchive(File file) throws IOException {
+ try (FileInputStream fis = new FileInputStream(file)) {
+ return IOUtils.toByteArray(fis);
+ }
+ }
+
+ /**
+ * Archives specified folder into zip output stream.
+ *
+ * @param rootDir Root directory.
+ * @param srcDir Source directory.
+ * @param out Zip output stream.
+ * @throws IOException In case of input/output exception.
+ */
+ private void compressDirectoryToZip(String rootDir, String srcDir, ZipOutputStream out) throws IOException {
+ File[] files = new File(srcDir).listFiles();
+
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory())
+ compressDirectoryToZip(rootDir, srcDir + File.separator + file.getName(), out);
+ else {
+ ZipEntry entry = new ZipEntry(srcDir.replace(rootDir, "")
+ + File.separator + file.getName());
+ out.putNextEntry(entry);
+
+ try (FileInputStream in = new FileInputStream(srcDir + File.separator + file.getName())) {
+ IOUtils.copy(in, out);
+ }
+ }
+ }
+ }
+ }
+
+ /** */
+ public void setCacheName(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ /** */
+ public void setJobFolder(String jobFolder) {
+ this.jobFolder = jobFolder;
+ }
+
+ /** */
+ public void setJobCmd(String jobCmd) {
+ this.jobCmd = jobCmd;
+ }
+
+ /** */
+ public void setJobArguments(String[] jobArguments) {
+ this.jobArguments = jobArguments;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java
new file mode 100644
index 0000000..8890370
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/StopCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.submitter.command;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.tensorflow.cluster.TensorFlowClusterGatewayManager;
+import picocli.CommandLine;
+
+/**
+ * Command "stop" that is used to stop TensorFlow cluster.
+ */
+@CommandLine.Command(
+ name = "stop",
+ description = "Stops a running TensorFlow cluster.",
+ mixinStandardHelpOptions = true
+)
+public class StopCommand extends AbstractCommand {
+ /** Cluster identifier. */
+ @CommandLine.Parameters(paramLabel = "CLUSTER_ID", description = "Cluster identifier.")
+ private UUID clusterId;
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try (Ignite ignite = getIgnite()) {
+ TensorFlowClusterGatewayManager mgr = new TensorFlowClusterGatewayManager(ignite);
+ mgr.stopClusterIfExists(clusterId);
+ }
+ }
+
+ /** */
+ public void setClusterId(UUID clusterId) {
+ this.clusterId = clusterId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java
new file mode 100644
index 0000000..7949feb
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/command/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * This package contains commands that command line tool provides. Pico CLI is used to make these commands maintainable.
+ */
+package org.apache.ignite.tensorflow.submitter.command;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java
new file mode 100644
index 0000000..8288b16
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/submitter/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * This package contains classes that allow to use command line interface to submit jobs into TensorFlow in Apache
+ * Ignite infrastructure.
+ */
+package org.apache.ignite.tensorflow.submitter;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java
new file mode 100644
index 0000000..ece58aa
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableConsumer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.util;
+
+import java.io.Serializable;
+import java.util.function.Consumer;
+
+/**
+ * Serializable consumer.
+ *
+ * @param <T> The type of the input to the operation.
+ */
+public interface SerializableConsumer<T> extends Consumer<T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java
new file mode 100644
index 0000000..768dbe1
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/SerializableSupplier.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tensorflow.util;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Serializable supplier.
+ *
+ * @param <T> The type of results supplied by this supplier.
+ */
+public interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java
new file mode 100644
index 0000000..8ed43c3
--- /dev/null
+++ b/modules/tensorflow/src/main/java/org/apache/ignite/tensorflow/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Util classes used in {@link org.apache.ignite.tensorflow} package.
+ */
+package org.apache.ignite.tensorflow.util;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/sh/ignite-tf.sh
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/sh/ignite-tf.sh b/modules/tensorflow/src/main/sh/ignite-tf.sh
new file mode 100755
index 0000000..fd3e02c
--- /dev/null
+++ b/modules/tensorflow/src/main/sh/ignite-tf.sh
@@ -0,0 +1,19 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+SCRIPT_PATH="$( cd "$(dirname "$0")" ; pwd -P )"
+java -Xmx4G -DIGNITE_QUIET=false -cp "$SCRIPT_PATH:$SCRIPT_PATH/lib/*" org.apache.ignite.tensorflow.submitter.JobSubmitter "$@"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/main/sh/logback.xml
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/main/sh/logback.xml b/modules/tensorflow/src/main/sh/logback.xml
new file mode 100644
index 0000000..816b5e6
--- /dev/null
+++ b/modules/tensorflow/src/main/sh/logback.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!--
+ Logback configuration file.
+-->
+<configuration>
+
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>ignite-tf.log</file>
+ <append>false</append>
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="warn">
+ <appender-ref ref="FILE" />
+ </root>
+
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/9e884e5a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java
----------------------------------------------------------------------
diff --git a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java
index 7d917e7..faa2b6b 100644
--- a/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java
+++ b/modules/tensorflow/src/test/java/org/apache/ignite/tensorflow/core/longrunning/LongRunningProcessManagerTest.java
@@ -17,13 +17,11 @@
package org.apache.ignite.tensorflow.core.longrunning;
-import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteCompute;
@@ -67,7 +65,7 @@ public class LongRunningProcessManagerTest {
List<LongRunningProcess> list = Collections.singletonList(new LongRunningProcess(nodeId, () -> {}));
- LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite);
+ LongRunningProcessManager mgr = new LongRunningProcessManager(ignite);
Map<UUID, List<UUID>> res = mgr.start(list);
assertEquals(1, res.size());
@@ -97,7 +95,7 @@ public class LongRunningProcessManagerTest {
Map<UUID, List<UUID>> procIds = new HashMap<>();
procIds.put(nodeId, Collections.singletonList(procId));
- LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite);
+ LongRunningProcessManager mgr = new LongRunningProcessManager(ignite);
Map<UUID, List<LongRunningProcessStatus>> res = mgr.ping(procIds);
assertEquals(1, res.size());
@@ -127,7 +125,7 @@ public class LongRunningProcessManagerTest {
Map<UUID, List<UUID>> procIds = new HashMap<>();
procIds.put(nodeId, Collections.singletonList(procId));
- LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite);
+ LongRunningProcessManager mgr = new LongRunningProcessManager(ignite);
Map<UUID, List<LongRunningProcessStatus>> res = mgr.stop(procIds, true);
assertEquals(1, res.size());
@@ -157,7 +155,7 @@ public class LongRunningProcessManagerTest {
Map<UUID, List<UUID>> procIds = new HashMap<>();
procIds.put(nodeId, Collections.singletonList(procId));
- LongRunningProcessManager mgr = new LongRunningProcessManager((Supplier<Ignite> & Serializable)() -> ignite);
+ LongRunningProcessManager mgr = new LongRunningProcessManager(ignite);
Map<UUID, List<LongRunningProcessStatus>> res = mgr.clear(procIds);
assertEquals(1, res.size());