You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@geode.apache.org by "Barry Oglesby (JIRA)" <ji...@apache.org> on 2018/10/30 21:27:00 UTC

[jira] [Assigned] (GEODE-5959) Nested function executions can cause a performance issue

     [ https://issues.apache.org/jira/browse/GEODE-5959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Barry Oglesby reassigned GEODE-5959:
------------------------------------

    Assignee: Barry Oglesby

> Nested function executions can cause a performance issue
> --------------------------------------------------------
>
>                 Key: GEODE-5959
>                 URL: https://issues.apache.org/jira/browse/GEODE-5959
>             Project: Geode
>          Issue Type: Bug
>          Components: functions
>            Reporter: Barry Oglesby
>            Assignee: Barry Oglesby
>            Priority: Major
>
> When a client executes a function, the server does:
> 1. The ServerConnection receives the function request, creates a runnable task and executes it on the thread pool.
> 2a. If there are available threads in the pool, one is used
> 2b. If there are no available threads in the pool and all the threads are not in use, then a thread is created and used
> 2c. If there are no available threads in the pool and all the threads are in use, then:
>  - the task is put into a queue (a BlockingQueue)
>  - a thread called Function Execution Processor1 takes the task from that queue and offers it to another queue. This other queue is a SynchronousQueue (an insert waits for a removal). So, basically a thread has to be available for the offer to succeed.
>  - after 5 seconds by default (controlled by gemfire.RETRY_INTERVAL), the offer fails and the rejectedExecutionHandler is invoked. This handler spins off a thread to process that task.
> Once the thread pool is in the state where no threads are available, every new function execution will take at least 5 seconds plus the time it takes to execute the function.
> If MAX_FE_THREADS is 32 and I run a test like:
> - launch 50 ParentFunctions onRegion with a replicated region each of which executes a ChildFunction on the same region
> - launch 1000 (or some number) of other functions that execute quickly
> All 32 threads in the pool will be in use immediately. These threads will be processing ParentFunctions which have invoked the ChildFunction and be waiting for the result. The next 18 (making 50) will cause threads to be spun off after 5 second wait for each. These will also get stuck waiting for the ChildFunctions to execute. The next 1000 will each take 5 seconds to offer, then spin off a thread that executes quickly. These are all processed sequentially. If the function processes quickly enough, it won't show up in thread dumps.
> When the threads pool is in this state, the server will contain threads like below.
> For each client request, there will be a ServerConnection thread waiting for the function execution request to complete here:
> {noformat}
> "ServerConnection on port 62483 Thread 42" #155 daemon prio=5 os_prio=31 tid=0x00007fdf072b6800 nid=0x15703 waiting on condition [0x0000700018bb7000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x00000006c01c1378> (a java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at org.apache.geode.internal.cache.execute.LocalResultCollectorImpl.getResult(LocalResultCollectorImpl.java:110)
>  at org.apache.geode.internal.cache.tier.sockets.command.ExecuteRegionFunction66.cmdExecute(ExecuteRegionFunction66.java:255)
>  at org.apache.geode.internal.cache.tier.sockets.BaseCommand.execute(BaseCommand.java:178)
>  at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMessage(ServerConnection.java:844)
>  at org.apache.geode.internal.cache.tier.sockets.OriginalServerConnection.doOneMessage(OriginalServerConnection.java:74)
>  at org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1214)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.lambda$initializeServerConnectionThreadPool$3(AcceptorImpl.java:593)
>  at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl$$Lambda$107/881662115.invoke(Unknown Source)
>  at org.apache.geode.internal.logging.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:121)
>  at org.apache.geode.internal.logging.LoggingThreadFactory$$Lambda$18/49222910.run(Unknown Source)
>  at java.lang.Thread.run(Thread.java:745)
> {noformat}
> There will be a corresponding Function Execution Processor thread in the middle of executing the parent function and waiting for child function execution:
> {noformat}
> "Function Execution Processor12" #158 daemon prio=5 os_prio=31 tid=0x00007fdf072a6000 nid=0xd707 waiting on condition [0x0000700014af7000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x00000006c05783e8> (a java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at org.apache.geode.internal.cache.execute.LocalResultCollectorImpl.getResult(LocalResultCollectorImpl.java:110)
>  at ParentFunction.execute(TestParentFunction.java:20)
>  at org.apache.geode.internal.cache.execute.AbstractExecution.executeFunctionLocally(AbstractExecution.java:331)
>  at org.apache.geode.internal.cache.execute.AbstractExecution$2.run(AbstractExecution.java:300)
>  at org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor$1$1.run(FunctionExecutionPooledExecutor.java:151)
>  at org.apache.geode.distributed.internal.ClusterDistributionManager.runUntilShutdown(ClusterDistributionManager.java:952)
>  at org.apache.geode.distributed.internal.ClusterDistributionManager.doFunctionExecutionThread(ClusterDistributionManager.java:806)
>  at org.apache.geode.distributed.internal.ClusterDistributionManager$$Lambda$28/795011696.invoke(Unknown Source)
>  at org.apache.geode.internal.logging.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:121)
>  at org.apache.geode.internal.logging.LoggingThreadFactory$$Lambda$18/49222910.run(Unknown Source)
>  at java.lang.Thread.run(Thread.java:745)
> {noformat}
> One Function Execution Processor (called Function Execution Processor1) will be offering a task to the SynchronousQueue here:
> {noformat}
> "Function Execution Processor1" #20 daemon prio=5 os_prio=31 tid=0x00007fdf013bf800 nid=0x6d03 waiting on condition [0x0000700011352000]
>  java.lang.Thread.State: TIMED_WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x00000006e9b364b0> (a java.util.concurrent.SynchronousQueue$TransferStack)
>  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>  at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
>  at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
>  at java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
>  at org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor$2.run(FunctionExecutionPooledExecutor.java:194)
>  at org.apache.geode.distributed.internal.ClusterDistributionManager.runUntilShutdown(ClusterDistributionManager.java:952)
>  at org.apache.geode.distributed.internal.ClusterDistributionManager.doFunctionExecutionThread(ClusterDistributionManager.java:806)
>  at org.apache.geode.distributed.internal.ClusterDistributionManager$$Lambda$28/795011696.invoke(Unknown Source)
>  at org.apache.geode.internal.logging.LoggingThreadFactory.lambda$newThread$0(LoggingThreadFactory.java:121)
>  at org.apache.geode.internal.logging.LoggingThreadFactory$$Lambda$18/49222910.run(Unknown Source)
>  at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This thread will be in this state for 5 seconds and then execute the task anyway. This causes the RejectedExecutionHandler to spin off a thread for it.
> Added logging shows this behavior:
> {noformat}
> [warning 2018/10/30 14:03:23.668 PDT <Function Execution Processor1> tid=0x14] FunctionExecutionPooledExecutor.run took task=org.apache.geode.internal.cache.execute.AbstractExecution$2@392e98bf
> [warning 2018/10/30 14:03:23.669 PDT <Function Execution Processor1> tid=0x14] FunctionExecutionPooledExecutor.run about to offer task=org.apache.geode.internal.cache.execute.AbstractExecution$2@392e98bf
> [warning 2018/10/30 14:03:28.670 PDT <Function Execution Processor1> tid=0x14] FunctionExecutionPooledExecutor.run unsuccessfully offered task=org.apache.geode.internal.cache.execute.AbstractExecution$2@392e98bf
> [warning 2018/10/30 14:03:28.670 PDT <Function Execution Processor1> tid=0x14] An additional Function Execution Processor thread is being launched because all 10 thread pool threads are in use for greater than 5000 ms
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)