You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by gn...@apache.org on 2023/01/06 13:42:14 UTC

[maven-mvnd] 17/28: Support redirecting input from client to daemon, #541 (#581)

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch mvnd-0.9.x
in repository https://gitbox.apache.org/repos/asf/maven-mvnd.git

commit c9dec667231507d1c14490c6b4630ad32780bdf2
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Tue Dec 13 23:59:35 2022 +0100

    Support redirecting input from client to daemon, #541 (#581)
    
    The implementation currently switches on the redirection when the daemon actually starts reading the System.in stream using InputStream.read() or InputStream.available().
---
 .../java/org/mvndaemon/mvnd/common/Message.java    | 76 ++++++++++++++++++++++
 .../mvnd/common/logging/TerminalOutput.java        | 31 ++++++++-
 .../java/org/mvndaemon/mvnd/daemon/Server.java     | 74 +++++++++++++++++++++
 3 files changed, 178 insertions(+), 3 deletions(-)

diff --git a/common/src/main/java/org/mvndaemon/mvnd/common/Message.java b/common/src/main/java/org/mvndaemon/mvnd/common/Message.java
index 10b641e5..f274f940 100644
--- a/common/src/main/java/org/mvndaemon/mvnd/common/Message.java
+++ b/common/src/main/java/org/mvndaemon/mvnd/common/Message.java
@@ -62,6 +62,8 @@ public abstract class Message {
     public static final int EXECUTION_FAILURE = 24;
     public static final int PRINT_OUT = 25;
     public static final int PRINT_ERR = 26;
+    public static final int REQUEST_INPUT = 27;
+    public static final int INPUT_DATA = 28;
 
     final int type;
 
@@ -115,6 +117,10 @@ public abstract class Message {
             case PRINT_OUT:
             case PRINT_ERR:
                 return StringMessage.read(type, input);
+            case REQUEST_INPUT:
+                return RequestInput.read(input);
+            case INPUT_DATA:
+                return InputData.read(input);
         }
         throw new IllegalStateException("Unexpected message type: " + type);
     }
@@ -137,6 +143,8 @@ public abstract class Message {
             case DISPLAY:
             case PRINT_OUT:
             case PRINT_ERR:
+            case REQUEST_INPUT:
+            case INPUT_DATA:
                 return 2;
             case PROJECT_STARTED:
                 return 3;
@@ -1036,6 +1044,66 @@ public abstract class Message {
         }
     }
 
+    public static class RequestInput extends Message {
+
+        private String projectId;
+
+        public static RequestInput read(DataInputStream input) throws IOException {
+            String projectId = readUTF(input);
+            return new RequestInput(projectId);
+        }
+
+        public RequestInput(String projectId) {
+            super(REQUEST_INPUT);
+            this.projectId = projectId;
+        }
+
+        public String getProjectId() {
+            return projectId;
+        }
+
+        @Override
+        public String toString() {
+            return "RequestInput{" + "projectId='" + projectId + '\'' + '}';
+        }
+
+        @Override
+        public void write(DataOutputStream output) throws IOException {
+            super.write(output);
+            writeUTF(output, projectId);
+        }
+    }
+
+    public static class InputData extends Message {
+
+        final String data;
+
+        public static Message read(DataInputStream input) throws IOException {
+            String data = readUTF(input);
+            return new InputData(data);
+        }
+
+        private InputData(String data) {
+            super(INPUT_DATA);
+            this.data = data;
+        }
+
+        public String getData() {
+            return data;
+        }
+
+        @Override
+        public String toString() {
+            return "InputResponse{" + "data='" + data + "\'" + '}';
+        }
+
+        @Override
+        public void write(DataOutputStream output) throws IOException {
+            super.write(output);
+            writeUTF(output, data);
+        }
+    }
+
     public int getType() {
         return type;
     }
@@ -1048,6 +1116,14 @@ public abstract class Message {
         return new StringMessage(DISPLAY, message);
     }
 
+    public static RequestInput requestInput(String projectId) {
+        return new RequestInput(projectId);
+    }
+
+    public static InputData inputResponse(String data) {
+        return new InputData(data);
+    }
+
     public static StringMessage out(String message) {
         return new StringMessage(PRINT_OUT, message);
     }
diff --git a/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java b/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java
index e5e5edaf..a7e024ef 100644
--- a/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java
+++ b/common/src/main/java/org/mvndaemon/mvnd/common/logging/TerminalOutput.java
@@ -53,6 +53,7 @@ import org.mvndaemon.mvnd.common.Message.BuildStarted;
 import org.mvndaemon.mvnd.common.Message.ExecutionFailureEvent;
 import org.mvndaemon.mvnd.common.Message.MojoStartedEvent;
 import org.mvndaemon.mvnd.common.Message.ProjectEvent;
+import org.mvndaemon.mvnd.common.Message.RequestInput;
 import org.mvndaemon.mvnd.common.Message.StringMessage;
 import org.mvndaemon.mvnd.common.Message.TransferEvent;
 import org.mvndaemon.mvnd.common.OsUtils;
@@ -112,6 +113,8 @@ public class TerminalOutput implements ClientOutput {
     private volatile Consumer<Message> daemonDispatch;
     /** A sink for queuing messages to the main queue */
     private volatile Consumer<Message> daemonReceive;
+    /** The project id which is trying to read the input stream */
+    private volatile String projectReadingInput;
 
     /*
      * The following non-final fields are read/written from the main thread only.
@@ -441,6 +444,15 @@ public class TerminalOutput implements ClientOutput {
                 failures.add(efe);
                 break;
             }
+            case Message.REQUEST_INPUT: {
+                RequestInput ri = (RequestInput) entry;
+                projectReadingInput = ri.getProjectId();
+                break;
+            }
+            case Message.INPUT_DATA: {
+                daemonDispatch.accept(entry);
+                break;
+            }
             default:
                 throw new IllegalStateException("Unexpected message " + entry);
         }
@@ -480,7 +492,21 @@ public class TerminalOutput implements ClientOutput {
         try {
             while (!closing) {
                 if (readInput.readLock().tryLock(10, TimeUnit.MILLISECONDS)) {
-                    try {
+                    if (projectReadingInput != null) {
+                        char[] buf = new char[256];
+                        int idx = 0;
+                        while (idx < buf.length) {
+                            int c = terminal.reader().read(idx > 0 ? 1 : 10);
+                            if (c < 0) {
+                                break;
+                            }
+                            buf[idx++] = (char) c;
+                        }
+                        if (idx > 0) {
+                            String data = String.valueOf(buf, 0, idx);
+                            daemonReceive.accept(Message.inputResponse(data));
+                        }
+                    } else {
                         int c = terminal.reader().read(10);
                         if (c == -1) {
                             break;
@@ -488,9 +514,8 @@ public class TerminalOutput implements ClientOutput {
                         if (c == KEY_PLUS || c == KEY_MINUS || c == KEY_CTRL_L || c == KEY_CTRL_M || c == KEY_CTRL_B) {
                             daemonReceive.accept(Message.keyboardInput((char) c));
                         }
-                    } finally {
-                        readInput.readLock().unlock();
                     }
+                    readInput.readLock().unlock();
                 }
             }
         } catch (InterruptedException e) {
diff --git a/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java b/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java
index 7d3da541..d16483c8 100644
--- a/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java
+++ b/daemon/src/main/java/org/mvndaemon/mvnd/daemon/Server.java
@@ -25,18 +25,23 @@ import static org.mvndaemon.mvnd.common.DaemonState.StopRequested;
 import static org.mvndaemon.mvnd.common.DaemonState.Stopped;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.SecureRandom;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -47,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.maven.cli.DaemonMavenCli;
@@ -482,6 +488,8 @@ public class Server implements AutoCloseable, Runnable {
         final BlockingQueue<Message> sendQueue = new PriorityBlockingQueue<>(64, Message.getMessageComparator());
         final BlockingQueue<Message> recvQueue = new LinkedBlockingDeque<>();
         final BuildEventListener buildEventListener = new ClientDispatcher(sendQueue);
+        final DaemonInputStream daemonInputStream =
+                new DaemonInputStream(projectId -> sendQueue.add(Message.requestInput(projectId)));
         try (ProjectBuildLogAppender logAppender = new ProjectBuildLogAppender(buildEventListener)) {
 
             LOGGER.info("Executing request");
@@ -529,6 +537,8 @@ public class Server implements AutoCloseable, Runnable {
                         if (message == Message.BareMessage.CANCEL_BUILD_SINGLETON) {
                             updateState(Canceled);
                             return;
+                        } else if (message instanceof Message.InputData) {
+                            daemonInputStream.addInputData(((Message.InputData) message).getData());
                         } else {
                             synchronized (recvQueue) {
                                 recvQueue.put(message);
@@ -581,6 +591,7 @@ public class Server implements AutoCloseable, Runnable {
                         }
                     }
                 });
+                System.setIn(daemonInputStream);
                 System.setOut(new LoggingOutputStream(s -> sendQueue.add(Message.out(s))).printStream());
                 System.setErr(new LoggingOutputStream(s -> sendQueue.add(Message.err(s))).printStream());
                 int exitCode = cli.main(
@@ -650,4 +661,67 @@ public class Server implements AutoCloseable, Runnable {
     public String toString() {
         return info.toString();
     }
+
+    static class DaemonInputStream extends InputStream {
+        private final Consumer<String> startReadingFromProject;
+        private final LinkedList<byte[]> datas = new LinkedList<>();
+        private int pos = -1;
+        private String projectReading = null;
+
+        DaemonInputStream(Consumer<String> startReadingFromProject) {
+            this.startReadingFromProject = startReadingFromProject;
+        }
+
+        @Override
+        public int available() throws IOException {
+            synchronized (datas) {
+                String projectId = ProjectBuildLogAppender.getProjectId();
+                if (!Objects.equals(projectId, projectReading)) {
+                    projectReading = projectId;
+                    startReadingFromProject.accept(projectId);
+                }
+                return datas.stream().mapToInt(a -> a.length).sum() - Math.max(pos, 0);
+            }
+        }
+
+        @Override
+        public int read() throws IOException {
+            synchronized (datas) {
+                String projectId = ProjectBuildLogAppender.getProjectId();
+                if (!Objects.equals(projectId, projectReading)) {
+                    projectReading = projectId;
+                    startReadingFromProject.accept(projectId);
+                    // TODO: start a 10ms timer to turn data off
+                }
+                for (; ; ) {
+                    if (datas.isEmpty()) {
+                        try {
+                            datas.wait();
+                        } catch (InterruptedException e) {
+                            throw new InterruptedIOException("Interrupted");
+                        }
+                        pos = -1;
+                        continue;
+                    }
+                    byte[] curData = datas.getFirst();
+                    if (pos >= curData.length) {
+                        datas.removeFirst();
+                        pos = -1;
+                        continue;
+                    }
+                    if (pos < 0) {
+                        pos = 0;
+                    }
+                    return curData[pos++];
+                }
+            }
+        }
+
+        public void addInputData(String data) {
+            synchronized (datas) {
+                datas.add(data.getBytes(Charset.forName(System.getProperty("file.encoding"))));
+                datas.notifyAll();
+            }
+        }
+    }
 }