You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brooklyn.apache.org by "Aled Sage (JIRA)" <ji...@apache.org> on 2017/02/17 23:45:44 UTC
[jira] [Created] (BROOKLYN-440) More efficient thread usage for ssh
execution
Aled Sage created BROOKLYN-440:
----------------------------------
Summary: More efficient thread usage for ssh execution
Key: BROOKLYN-440
URL: https://issues.apache.org/jira/browse/BROOKLYN-440
Project: Brooklyn
Issue Type: Improvement
Affects Versions: 0.10.0
Reporter: Aled Sage
Priority: Minor
For consuming the stdout/stderr from ssh execution, our use of {{PipedInputStream}}/{{PipedOutputStream}} and {{StreamGobbler}} looks very inefficient (my fault - I wrote it originally!)
We normally consume 6 threads per ssh execution:
1. The calling thread of {{SshMachineLocation.execScript}} is blocker, waiting for it to complete.
2. Within sshj, there is a "sftp reader" thread that reads the packets
3. Within {{SshjTool.ExecAction}}, we create a {{StreamGobbler}} thread to read the ssh stdout, as it is made available.
4. Same for stderr.
5. Within {{ExecWithLoggingHelpers.execWithLogging}}, we create a {{StreamGobber}} thread to read + log the ssh stdout, as it is made available.
6. Same for stderr.
I'm pretty sure we can get rid of threads 5 and 6; not sure about 3 and 4 though.
Here is the chain of actions:
* We call something like {{sshMachineLocation.execScript}}. This can include "out" and "err" config, to obtain the exec stdout/stderr.
* {{SshMachineLocation}} wraps the command execution in {{ExecWithLoggingHelpers.execWithLogging}}.
* In order to log the stdout (and same for stderr):
* It creates a {{PipedOutputStream}} and {{PipedInputStream}}. It sets the {{PipedOutputStream}} as the stdout to use (i.e. config passed to {{SshjTool}})
* It creates a {{StreamGobbler}}, which is a thread that consumes the {{PipedInputStream}} - this logs each line, and also writes each line to the original "out".
* Within {{SshjTool.ExecAction}}, it has an sshj {{Session.Command.getInputStream()}} and {{.getErrorStream()}} for reading the stdout and stderr.
It creates a {{StreamGobbler}}, which is a thread to consume these input streams; it writes the bytes received from that input stream to the {{out}} and {{err}} streams passed in.
For the logging, a simpler and more efficient approach would be to wrap the OutputStream. See {{com.google.common.io.CountingOutputStream}} for inspiration. It should be as simple as extending {{java.util.FilterOutputStream}}, and overriding the {{write}} and {{close}} methods. The implementation of these methods would call the wrapped outputStream as well as doing some thing very similar to {{StreamGobbler.onChar()}}.
I don't see a way to do the same trick inside {{SshjTool.ExecAction}}, unfortunately, without changes to sshj to wrap {{ChannelInputStream}} which is created inside {{ net.schmizz.sshj.connection.channel.AbstractChannel}} 's constructor (or to override {{AbstractChannel.receiveInto}} perhaps). None of those seem worth it.
Below is a trimmed down jstack from executing {{sleep 100}} over ssh:
{noformat}
2017-02-13 11:09:35
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.80-b11 mixed mode):
"main" prio=5 tid=0x00007f8ba180a000 nid=0x1c03 waiting on condition [0x0000700006b21000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007f4069868> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
at net.schmizz.concurrent.Promise.tryRetrieve(Promise.java:170)
at net.schmizz.concurrent.Promise.retrieve(Promise.java:137)
at net.schmizz.concurrent.Event.await(Event.java:103)
at net.schmizz.sshj.connection.channel.AbstractChannel.join(AbstractChannel.java:259)
at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:1009)
at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:926)
at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:627)
at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:613)
at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$1.run(SshjTool.java:327)
at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.execScript(SshjTool.java:329)
at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$1.exec(ExecWithLoggingHelpers.java:83)
at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:168)
at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:165)
at org.apache.brooklyn.util.pool.BasicPool.exec(BasicPool.java:146)
at org.apache.brooklyn.location.ssh.SshMachineLocation.execSsh(SshMachineLocation.java:601)
at org.apache.brooklyn.location.ssh.SshMachineLocation$13.execWithTool(SshMachineLocation.java:780)
at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execWithLogging(ExecWithLoggingHelpers.java:165)
at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execScript(ExecWithLoggingHelpers.java:81)
at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:764)
at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:758)
at org.apache.brooklyn.location.ssh.SshMachineLocationIntegrationTest.testSlowForVisualInspection(SshMachineLocationIntegrationTest.java:98)
"sftp reader" prio=5 tid=0x00007f8ba43a6800 nid=0x6503 in Object.wait() [0x0000700008571000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
at java.lang.Object.wait(Object.java:503)
at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
- locked <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
at net.schmizz.sshj.sftp.PacketReader.readIntoBuffer(PacketReader.java:51)
at net.schmizz.sshj.sftp.PacketReader.getPacketLength(PacketReader.java:59)
at net.schmizz.sshj.sftp.PacketReader.readPacket(PacketReader.java:75)
at net.schmizz.sshj.sftp.PacketReader.run(PacketReader.java:87)
"Thread-7" prio=5 tid=0x00007f8ba1a9f800 nid=0x6903 in Object.wait() [0x0000700008777000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
at java.lang.Object.wait(Object.java:503)
at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
- locked <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
- locked <0x00000007f4069930> (a [B)
at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
"Thread-6" prio=5 tid=0x00007f8ba1b2b000 nid=0x6703 in Object.wait() [0x0000700008674000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
at java.lang.Object.wait(Object.java:503)
at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
- locked <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
- locked <0x00000007f4061680> (a [B)
at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
"reader" prio=5 tid=0x00007f8ba385c000 nid=0x6303 runnable [0x000070000846e000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at net.schmizz.sshj.transport.Reader.run(Reader.java:50)
"brooklyn-execmanager-SrVbReTz-3" daemon prio=5 tid=0x00007f8ba21eb000 nid=0x6103 waiting on condition [0x000070000836b000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007fc0088e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
"Thread-2" prio=5 tid=0x00007f8ba21ea000 nid=0x5f03 in Object.wait() [0x0000700008268000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007fc018088> (a java.io.PipedInputStream)
at java.io.PipedInputStream.read(PipedInputStream.java:327)
- locked <0x00000007fc018088> (a java.io.PipedInputStream)
at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
"Thread-1" prio=5 tid=0x00007f8ba20e7800 nid=0x5d03 in Object.wait() [0x0000700008165000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007fc020088> (a java.io.PipedInputStream)
at java.io.PipedInputStream.read(PipedInputStream.java:327)
- locked <0x00000007fc020088> (a java.io.PipedInputStream)
at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
{noformat}
([~svet] I spoke to you about this previously, and believe you have some comments?)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)