You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brooklyn.apache.org by "Aled Sage (JIRA)" <ji...@apache.org> on 2017/02/17 23:45:44 UTC

[jira] [Created] (BROOKLYN-440) More efficient thread usage for ssh execution

Aled Sage created BROOKLYN-440:
----------------------------------

             Summary: More efficient thread usage for ssh execution
                 Key: BROOKLYN-440
                 URL: https://issues.apache.org/jira/browse/BROOKLYN-440
             Project: Brooklyn
          Issue Type: Improvement
    Affects Versions: 0.10.0
            Reporter: Aled Sage
            Priority: Minor


For consuming the stdout/stderr from ssh execution, our use of {{PipedInputStream}}/{{PipedOutputStream}} and {{StreamGobbler}} looks very inefficient (my fault - I wrote it originally!)

We normally consume 6 threads per ssh execution:
1. The calling thread of {{SshMachineLocation.execScript}} is blocker, waiting for it to complete.
2. Within sshj, there is a "sftp reader" thread that reads the packets
3. Within {{SshjTool.ExecAction}}, we create a {{StreamGobbler}} thread to read the ssh stdout, as it is made available.
4. Same for stderr.
5. Within {{ExecWithLoggingHelpers.execWithLogging}}, we create a {{StreamGobber}} thread to read + log the ssh stdout, as it is made available.
6. Same for stderr.

I'm pretty sure we can get rid of threads 5 and 6; not sure about 3 and 4 though.

Here is the chain of actions:
* We call something like {{sshMachineLocation.execScript}}. This can include "out" and "err" config, to obtain the exec stdout/stderr.
* {{SshMachineLocation}} wraps the command execution in {{ExecWithLoggingHelpers.execWithLogging}}.
* In order to log the stdout (and same for stderr):
  * It creates a {{PipedOutputStream}} and {{PipedInputStream}}. It sets the {{PipedOutputStream}} as the stdout to use (i.e. config passed to {{SshjTool}})
  * It creates a {{StreamGobbler}}, which is a thread that consumes the {{PipedInputStream}} - this logs each line, and also writes each line to the original "out".
* Within {{SshjTool.ExecAction}}, it has an sshj {{Session.Command.getInputStream()}} and {{.getErrorStream()}} for reading the stdout and stderr.
  It creates a {{StreamGobbler}}, which is a thread to consume these input streams; it writes the bytes received from that input stream to the {{out}} and {{err}} streams passed in.

For the logging, a simpler and more efficient approach would be to wrap the OutputStream. See {{com.google.common.io.CountingOutputStream}} for inspiration. It should be as simple as extending {{java.util.FilterOutputStream}}, and overriding the {{write}} and {{close}} methods. The implementation of these methods would call the wrapped outputStream as well as doing some thing very similar to {{StreamGobbler.onChar()}}.

I don't see a way to do the same trick inside {{SshjTool.ExecAction}}, unfortunately, without changes to sshj to wrap {{ChannelInputStream}} which is created inside {{ net.schmizz.sshj.connection.channel.AbstractChannel}} 's constructor (or to override {{AbstractChannel.receiveInto}} perhaps). None of those seem worth it.

Below is a trimmed down jstack from executing {{sleep 100}} over ssh:

{noformat}
2017-02-13 11:09:35
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.80-b11 mixed mode):

"main" prio=5 tid=0x00007f8ba180a000 nid=0x1c03 waiting on condition [0x0000700006b21000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007f4069868> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
    at net.schmizz.concurrent.Promise.tryRetrieve(Promise.java:170)
    at net.schmizz.concurrent.Promise.retrieve(Promise.java:137)
    at net.schmizz.concurrent.Event.await(Event.java:103)
    at net.schmizz.sshj.connection.channel.AbstractChannel.join(AbstractChannel.java:259)
    at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:1009)
    at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:926)
    at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:627)
    at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:613)
    at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$1.run(SshjTool.java:327)
    at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.execScript(SshjTool.java:329)
    at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$1.exec(ExecWithLoggingHelpers.java:83)
    at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:168)
    at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:165)
    at org.apache.brooklyn.util.pool.BasicPool.exec(BasicPool.java:146)
    at org.apache.brooklyn.location.ssh.SshMachineLocation.execSsh(SshMachineLocation.java:601)
    at org.apache.brooklyn.location.ssh.SshMachineLocation$13.execWithTool(SshMachineLocation.java:780)
    at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execWithLogging(ExecWithLoggingHelpers.java:165)
    at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execScript(ExecWithLoggingHelpers.java:81)
    at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:764)
    at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:758)
    at org.apache.brooklyn.location.ssh.SshMachineLocationIntegrationTest.testSlowForVisualInspection(SshMachineLocationIntegrationTest.java:98)

"sftp reader" prio=5 tid=0x00007f8ba43a6800 nid=0x6503 in Object.wait() [0x0000700008571000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
    at java.lang.Object.wait(Object.java:503)
    at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
    - locked <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
    at net.schmizz.sshj.sftp.PacketReader.readIntoBuffer(PacketReader.java:51)
    at net.schmizz.sshj.sftp.PacketReader.getPacketLength(PacketReader.java:59)
    at net.schmizz.sshj.sftp.PacketReader.readPacket(PacketReader.java:75)
    at net.schmizz.sshj.sftp.PacketReader.run(PacketReader.java:87)

"Thread-7" prio=5 tid=0x00007f8ba1a9f800 nid=0x6903 in Object.wait() [0x0000700008777000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
    at java.lang.Object.wait(Object.java:503)
    at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
    - locked <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
    at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
    - locked <0x00000007f4069930> (a [B)
    at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)

"Thread-6" prio=5 tid=0x00007f8ba1b2b000 nid=0x6703 in Object.wait() [0x0000700008674000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
    at java.lang.Object.wait(Object.java:503)
    at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
    - locked <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
    at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
    - locked <0x00000007f4061680> (a [B)
    at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)

"reader" prio=5 tid=0x00007f8ba385c000 nid=0x6303 runnable [0x000070000846e000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at net.schmizz.sshj.transport.Reader.run(Reader.java:50)

"brooklyn-execmanager-SrVbReTz-3" daemon prio=5 tid=0x00007f8ba21eb000 nid=0x6103 waiting on condition [0x000070000836b000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000007fc0088e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

"Thread-2" prio=5 tid=0x00007f8ba21ea000 nid=0x5f03 in Object.wait() [0x0000700008268000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007fc018088> (a java.io.PipedInputStream)
    at java.io.PipedInputStream.read(PipedInputStream.java:327)
    - locked <0x00000007fc018088> (a java.io.PipedInputStream)
    at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)

"Thread-1" prio=5 tid=0x00007f8ba20e7800 nid=0x5d03 in Object.wait() [0x0000700008165000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007fc020088> (a java.io.PipedInputStream)
    at java.io.PipedInputStream.read(PipedInputStream.java:327)
    - locked <0x00000007fc020088> (a java.io.PipedInputStream)
    at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
{noformat}

([~svet] I spoke to you about this previously, and believe you have some comments?)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)