You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by gn...@apache.org on 2023/01/06 13:42:14 UTC
[maven-mvnd] 17/28: Support redirecting input from client to daemon, #541 (#581)
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch mvnd-0.9.x
in repository https://gitbox.apache.org/repos/asf/maven-mvnd.git
commit c9dec667231507d1c14490c6b4630ad32780bdf2
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Tue Dec 13 23:59:35 2022 +0100
Support redirecting input from client to daemon, #541 (#581)
The implementation currently switches on the redirection when the daemon actually starts reading the System.in stream using InputStream.read() or InputStream.available().
---
.../java/org/mvndaemon/mvnd/common/Message.java | 76 ++++++++++++++++++++++
.../mvnd/common/logging/TerminalOutput.java | 31 ++++++++-
.../java/org/mvndaemon/mvnd/daemon/Server.java | 74 +++++++++++++++++++++
3 files changed, 178 insertions(+), 3 deletions(-)
diff --git a/common/src/main/java/org/mvndaemon/mvnd/common/Message.java b/common/src/main/java/org/mvndaemon/mvnd/common/Message.java
index 10b641e5..f274f940 100644
--- a/common/src/main/java/org/mvndaemon/mvnd/common/Message.java
+++ b/common/src/main/java/org/mvndaemon/mvnd/common/Message.java
@@ -62,6 +62,8 @@ public abstract class Message {
public static final int EXECUTION_FAILURE = 24;
public static final int PRINT_OUT = 25;
public static final int PRINT_ERR = 26;
+ public static final int REQUEST_INPUT = 27;
+ public static final int INPUT_DATA = 28;
final int type;
@@ -115,6 +117,10 @@ public abstract class Message {
case PRINT_OUT:
case PRINT_ERR:
return StringMessage.read(type, input);
+ case REQUEST_INPUT:
+ return RequestInput.read(input);
+ case INPUT_DATA:
+ return InputData.read(input);
}
throw new IllegalStateException("Unexpected message type: " + type);
}
@@ -137,6 +143,8 @@ public abstract class Message {
case DISPLAY:
case PRINT_OUT:
case PRINT_ERR:
+ case REQUEST_INPUT:
+ case INPUT_DATA:
return 2;
case PROJECT_STARTED:
return 3;
@@ -1036,6 +1044,66 @@ public abstract class Message {
}
}
+ public static class RequestInput extends Message {
+
+ private String projectId;
+
+ public static RequestInput read(DataInputStream input) throws IOException {
+ String projectId = readUTF(input);
+ return new RequestInput(projectId);
+ }
+
+ public RequestInput(String projectId) {
+ super(REQUEST_INPUT);
+ this.projectId = projectId;
+ }
+
+ public String getProjectId() {
+ return projectId;
+ }
+
+ @Override
+ public String toString() {
+ return "RequestInput{" + "projectId='" + projectId + '\'' + '}';
+ }
+
+ @Override
+ public void write(DataOutputStream output) throws IOException {
+ super.write(output);
+ writeUTF(output, projectId);
+ }
+ }
+
+ public static class InputData extends Message {
+
+ final String data;
+
+ public static Message read(DataInputStream input) throws IOException {
+ String data = readUTF(input);
+ return new InputData(data);
+ }
+
+ private InputData(String data) {
+ super(INPUT_DATA);
+ this.data = data;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ @Override
+ public String toString() {
+ return "InputResponse{" + "data='" + data + "\'" + '}';
+ }
+
+ @Override
+ public void write(DataOutputStream output) throws IOException {
+ super.write(output);
+ writeUTF(output, data);
+ }
+ }
+
public int getType() {
return type;
}
@@ -1048,6 +1116,14 @@ public abstract class Message {
return new StringMessage(DISPLAY, message);
}
+ public static RequestInput requestInput(String projectId) {
+ return new RequestInput(projectId);
+ }
+
+ public static InputData inputResponse(String data) {
+ return new InputData(data);
+ }
+
public static StringMessage out(String message) {
return new StringMessage(PRINT_OUT, message);
}
diff --git a/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java b/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java
index e5e5edaf..a7e024ef 100644
--- a/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java
+++ b/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java
@@ -53,6 +53,7 @@ import org.mvndaemon.mvnd.common.Message.BuildStarted;
import org.mvndaemon.mvnd.common.Message.ExecutionFailureEvent;
import org.mvndaemon.mvnd.common.Message.MojoStartedEvent;
import org.mvndaemon.mvnd.common.Message.ProjectEvent;
+import org.mvndaemon.mvnd.common.Message.RequestInput;
import org.mvndaemon.mvnd.common.Message.StringMessage;
import org.mvndaemon.mvnd.common.Message.TransferEvent;
import org.mvndaemon.mvnd.common.OsUtils;
@@ -112,6 +113,8 @@ public class TerminalOutput implements ClientOutput {
private volatile Consumer<Message> daemonDispatch;
/** A sink for queuing messages to the main queue */
private volatile Consumer<Message> daemonReceive;
+ /** The project id which is trying to read the input stream */
+ private volatile String projectReadingInput;
/*
* The following non-final fields are read/written from the main thread only.
@@ -441,6 +444,15 @@ public class TerminalOutput implements ClientOutput {
failures.add(efe);
break;
}
+ case Message.REQUEST_INPUT: {
+ RequestInput ri = (RequestInput) entry;
+ projectReadingInput = ri.getProjectId();
+ break;
+ }
+ case Message.INPUT_DATA: {
+ daemonDispatch.accept(entry);
+ break;
+ }
default:
throw new IllegalStateException("Unexpected message " + entry);
}
@@ -480,7 +492,21 @@ public class TerminalOutput implements ClientOutput {
try {
while (!closing) {
if (readInput.readLock().tryLock(10, TimeUnit.MILLISECONDS)) {
- try {
+ if (projectReadingInput != null) {
+ char[] buf = new char[256];
+ int idx = 0;
+ while (idx < buf.length) {
+ int c = terminal.reader().read(idx > 0 ? 1 : 10);
+ if (c < 0) {
+ break;
+ }
+ buf[idx++] = (char) c;
+ }
+ if (idx > 0) {
+ String data = String.valueOf(buf, 0, idx);
+ daemonReceive.accept(Message.inputResponse(data));
+ }
+ } else {
int c = terminal.reader().read(10);
if (c == -1) {
break;
@@ -488,9 +514,8 @@ public class TerminalOutput implements ClientOutput {
if (c == KEY_PLUS || c == KEY_MINUS || c == KEY_CTRL_L || c == KEY_CTRL_M || c == KEY_CTRL_B) {
daemonReceive.accept(Message.keyboardInput((char) c));
}
- } finally {
- readInput.readLock().unlock();
}
+ readInput.readLock().unlock();
}
}
} catch (InterruptedException e) {
diff --git a/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java b/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java
index 7d3da541..d16483c8 100644
--- a/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java
+++ b/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java
@@ -25,18 +25,23 @@ import static org.mvndaemon.mvnd.common.DaemonState.StopRequested;
import static org.mvndaemon.mvnd.common.DaemonState.Stopped;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.SecureRandom;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
@@ -47,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.maven.cli.DaemonMavenCli;
@@ -482,6 +488,8 @@ public class Server implements AutoCloseable, Runnable {
final BlockingQueue<Message> sendQueue = new PriorityBlockingQueue<>(64, Message.getMessageComparator());
final BlockingQueue<Message> recvQueue = new LinkedBlockingDeque<>();
final BuildEventListener buildEventListener = new ClientDispatcher(sendQueue);
+ final DaemonInputStream daemonInputStream =
+ new DaemonInputStream(projectId -> sendQueue.add(Message.requestInput(projectId)));
try (ProjectBuildLogAppender logAppender = new ProjectBuildLogAppender(buildEventListener)) {
LOGGER.info("Executing request");
@@ -529,6 +537,8 @@ public class Server implements AutoCloseable, Runnable {
if (message == Message.BareMessage.CANCEL_BUILD_SINGLETON) {
updateState(Canceled);
return;
+ } else if (message instanceof Message.InputData) {
+ daemonInputStream.addInputData(((Message.InputData) message).getData());
} else {
synchronized (recvQueue) {
recvQueue.put(message);
@@ -581,6 +591,7 @@ public class Server implements AutoCloseable, Runnable {
}
}
});
+ System.setIn(daemonInputStream);
System.setOut(new LoggingOutputStream(s -> sendQueue.add(Message.out(s))).printStream());
System.setErr(new LoggingOutputStream(s -> sendQueue.add(Message.err(s))).printStream());
int exitCode = cli.main(
@@ -650,4 +661,67 @@ public class Server implements AutoCloseable, Runnable {
public String toString() {
return info.toString();
}
+
+ static class DaemonInputStream extends InputStream {
+ private final Consumer<String> startReadingFromProject;
+ private final LinkedList<byte[]> datas = new LinkedList<>();
+ private int pos = -1;
+ private String projectReading = null;
+
+ DaemonInputStream(Consumer<String> startReadingFromProject) {
+ this.startReadingFromProject = startReadingFromProject;
+ }
+
+ @Override
+ public int available() throws IOException {
+ synchronized (datas) {
+ String projectId = ProjectBuildLogAppender.getProjectId();
+ if (!Objects.equals(projectId, projectReading)) {
+ projectReading = projectId;
+ startReadingFromProject.accept(projectId);
+ }
+ return datas.stream().mapToInt(a -> a.length).sum() - Math.max(pos, 0);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ synchronized (datas) {
+ String projectId = ProjectBuildLogAppender.getProjectId();
+ if (!Objects.equals(projectId, projectReading)) {
+ projectReading = projectId;
+ startReadingFromProject.accept(projectId);
+ // TODO: start a 10ms timer to turn data off
+ }
+ for (; ; ) {
+ if (datas.isEmpty()) {
+ try {
+ datas.wait();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted");
+ }
+ pos = -1;
+ continue;
+ }
+ byte[] curData = datas.getFirst();
+ if (pos >= curData.length) {
+ datas.removeFirst();
+ pos = -1;
+ continue;
+ }
+ if (pos < 0) {
+ pos = 0;
+ }
+ return curData[pos++];
+ }
+ }
+ }
+
+ public void addInputData(String data) {
+ synchronized (datas) {
+ datas.add(data.getBytes(Charset.forName(System.getProperty("file.encoding"))));
+ datas.notifyAll();
+ }
+ }
+ }
}