You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brooklyn.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/06/12 13:11:00 UTC

[jira] [Commented] (BROOKLYN-440) More efficient thread usage for ssh execution

    [ https://issues.apache.org/jira/browse/BROOKLYN-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046538#comment-16046538 ] 

ASF GitHub Bot commented on BROOKLYN-440:
-----------------------------------------

Github user neykov commented on the issue:

    https://github.com/apache/brooklyn-server/pull/731
  
    retest this please


> More efficient thread usage for ssh execution
> ---------------------------------------------
>
>                 Key: BROOKLYN-440
>                 URL: https://issues.apache.org/jira/browse/BROOKLYN-440
>             Project: Brooklyn
>          Issue Type: Improvement
>    Affects Versions: 0.10.0
>            Reporter: Aled Sage
>            Priority: Minor
>
> For consuming the stdout/stderr from ssh execution, our use of {{PipedInputStream}}/{{PipedOutputStream}} and {{StreamGobbler}} looks very inefficient (my fault - I wrote it originally!)
> We normally consume 6 threads per ssh execution:
> 1. The calling thread of {{SshMachineLocation.execScript}} is blocker, waiting for it to complete.
> 2. Within sshj, there is a "sftp reader" thread that reads the packets
> 3. Within {{SshjTool.ExecAction}}, we create a {{StreamGobbler}} thread to read the ssh stdout, as it is made available.
> 4. Same for stderr.
> 5. Within {{ExecWithLoggingHelpers.execWithLogging}}, we create a {{StreamGobber}} thread to read + log the ssh stdout, as it is made available.
> 6. Same for stderr.
> I'm pretty sure we can get rid of threads 5 and 6; not sure about 3 and 4 though.
> Here is the chain of actions:
> * We call something like {{sshMachineLocation.execScript}}. This can include "out" and "err" config, to obtain the exec stdout/stderr.
> * {{SshMachineLocation}} wraps the command execution in {{ExecWithLoggingHelpers.execWithLogging}}.
> * In order to log the stdout (and same for stderr):
>   * It creates a {{PipedOutputStream}} and {{PipedInputStream}}. It sets the {{PipedOutputStream}} as the stdout to use (i.e. config passed to {{SshjTool}})
>   * It creates a {{StreamGobbler}}, which is a thread that consumes the {{PipedInputStream}} - this logs each line, and also writes each line to the original "out".
> * Within {{SshjTool.ExecAction}}, it has an sshj {{Session.Command.getInputStream()}} and {{.getErrorStream()}} for reading the stdout and stderr.
>   It creates a {{StreamGobbler}}, which is a thread to consume these input streams; it writes the bytes received from that input stream to the {{out}} and {{err}} streams passed in.
> For the logging, a simpler and more efficient approach would be to wrap the OutputStream. See {{com.google.common.io.CountingOutputStream}} for inspiration. It should be as simple as extending {{java.util.FilterOutputStream}}, and overriding the {{write}} and {{close}} methods. The implementation of these methods would call the wrapped outputStream as well as doing some thing very similar to {{StreamGobbler.onChar()}}.
> I don't see a way to do the same trick inside {{SshjTool.ExecAction}}, unfortunately, without changes to sshj to wrap {{ChannelInputStream}} which is created inside {{ net.schmizz.sshj.connection.channel.AbstractChannel}} 's constructor (or to override {{AbstractChannel.receiveInto}} perhaps). None of those seem worth it.
> Below is a trimmed down jstack from executing {{sleep 100}} over ssh:
> {noformat}
> 2017-02-13 11:09:35
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.80-b11 mixed mode):
> "main" prio=5 tid=0x00007f8ba180a000 nid=0x1c03 waiting on condition [0x0000700006b21000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x00000007f4069868> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
>     at net.schmizz.concurrent.Promise.tryRetrieve(Promise.java:170)
>     at net.schmizz.concurrent.Promise.retrieve(Promise.java:137)
>     at net.schmizz.concurrent.Event.await(Event.java:103)
>     at net.schmizz.sshj.connection.channel.AbstractChannel.join(AbstractChannel.java:259)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:1009)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:926)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:627)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:613)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$1.run(SshjTool.java:327)
>     at org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.execScript(SshjTool.java:329)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$1.exec(ExecWithLoggingHelpers.java:83)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:168)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:165)
>     at org.apache.brooklyn.util.pool.BasicPool.exec(BasicPool.java:146)
>     at org.apache.brooklyn.location.ssh.SshMachineLocation.execSsh(SshMachineLocation.java:601)
>     at org.apache.brooklyn.location.ssh.SshMachineLocation$13.execWithTool(SshMachineLocation.java:780)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execWithLogging(ExecWithLoggingHelpers.java:165)
>     at org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execScript(ExecWithLoggingHelpers.java:81)
>     at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:764)
>     at org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:758)
>     at org.apache.brooklyn.location.ssh.SshMachineLocationIntegrationTest.testSlowForVisualInspection(SshMachineLocationIntegrationTest.java:98)
> "sftp reader" prio=5 tid=0x00007f8ba43a6800 nid=0x6503 in Object.wait() [0x0000700008571000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f3f15690> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at net.schmizz.sshj.sftp.PacketReader.readIntoBuffer(PacketReader.java:51)
>     at net.schmizz.sshj.sftp.PacketReader.getPacketLength(PacketReader.java:59)
>     at net.schmizz.sshj.sftp.PacketReader.readPacket(PacketReader.java:75)
>     at net.schmizz.sshj.sftp.PacketReader.run(PacketReader.java:87)
> "Thread-7" prio=5 tid=0x00007f8ba1a9f800 nid=0x6903 in Object.wait() [0x0000700008777000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f4069948> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
>     - locked <0x00000007f4069930> (a [B)
>     at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "Thread-6" prio=5 tid=0x00007f8ba1b2b000 nid=0x6703 in Object.wait() [0x0000700008674000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f4061698> (a net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
>     - locked <0x00000007f4061680> (a [B)
>     at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "reader" prio=5 tid=0x00007f8ba385c000 nid=0x6303 runnable [0x000070000846e000]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.SocketInputStream.socketRead0(Native Method)
>     at java.net.SocketInputStream.read(SocketInputStream.java:152)
>     at java.net.SocketInputStream.read(SocketInputStream.java:122)
>     at net.schmizz.sshj.transport.Reader.run(Reader.java:50)
> "brooklyn-execmanager-SrVbReTz-3" daemon prio=5 tid=0x00007f8ba21eb000 nid=0x6103 waiting on condition [0x000070000836b000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x00000007fc0088e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
>     at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> "Thread-2" prio=5 tid=0x00007f8ba21ea000 nid=0x5f03 in Object.wait() [0x0000700008268000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007fc018088> (a java.io.PipedInputStream)
>     at java.io.PipedInputStream.read(PipedInputStream.java:327)
>     - locked <0x00000007fc018088> (a java.io.PipedInputStream)
>     at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "Thread-1" prio=5 tid=0x00007f8ba20e7800 nid=0x5d03 in Object.wait() [0x0000700008165000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007fc020088> (a java.io.PipedInputStream)
>     at java.io.PipedInputStream.read(PipedInputStream.java:327)
>     - locked <0x00000007fc020088> (a java.io.PipedInputStream)
>     at org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> {noformat}
> ([~svet] I spoke to you about this previously, and believe you have some comments?)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)