You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jared Stehler <ja...@intellifylearning.com> on 2018/01/11 15:07:23 UTC

class loader issues when closing streams

I’m seeing sporadic issues where it appears that curator (or other) user threads are left running after a stream shutdown, and then the user class loader goes away and I get spammed with ClassNotFoundExceptions… I’m wondering if this might have something to do with perhaps the UserClassLoader being shut down before close is invoked on all operators?

Here’s a stack trace I see from an attempt at closing an elastic search sink:

java.lang.ClassNotFoundException: com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
    at com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
    at com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
    at com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
    at com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75)
    at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
    at com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56)
    at com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68)
    at com.intellify.flink.shared.elasticsearch.LiveResolvingEs1ApiCallBridge.cleanup(LiveResolvingEs1ApiCallBridge.java:105)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:323)
    at com.intellify.flink.shared.tracer.TracingSink.close(TracingSink.java:50)
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
I’m using a curator connection for archaius, and closing it in the call bridge’s cleanup method. I’m ensuring that I’m not reaching up into the parent class loader by shading curator and zookeeper. 

I also see the following on repeat in my task manager log:

2018-01-11 14:53:13.313 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us-west-2.compute.internal:2181)] WARN  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x3c00002d8a7603de for server ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181, unexpected error, closing socket connection and attempting reconnect
java.lang.NoClassDefFoundError: com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)


Does anyone have any insight into what might be happening here? Does this seem like I’m not closing a thread properly, or something else entirely?


--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703




Re: class loader issues when closing streams

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Jared,

I currently don't have a solid idea of what may be happening, but from the
stack dump you provided, it seems like the client connection you are using
in the Elasticsearch API call bridge is stuck, even after the cleanup.

Do you think there could be some issue with closing the client you are
using, and there could be some unclosed thread leak?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: class loader issues when closing streams

Posted by Jared Stehler <ja...@intellifylearning.com>.
Here’s a more complete view of the task manager log from the start of this occurrence:

2018-01-11 14:50:08.286 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-54-205.us-west-2.compute.internal:2181)] INFO  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable to read additional data from server sessionid 0x3c00002d8a7603de, likely server has closed socket, closing socket connection and attempting reconnect
2018-01-11 14:50:08.388 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (5/10)-EventThread] INFO  c.i.f.s.c.o.a.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED
2018-01-11 14:50:08.412 [flink-akka.actor.default-dispatcher-16] INFO  o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager: JobManager requested disconnect: JobManager is no longer the leader
2018-01-11 14:50:08.412 [flink-akka.actor.default-dispatcher-16] INFO  o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Cancelling all computations and discarding all cached data.
2018-01-11 14:50:08.450 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.runtime.taskmanager.Task  - Attempting to fail task externally Sink: ES (5/10) (1a2951add18548188742e85d98da271f).
2018-01-11 14:50:08.452 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.runtime.taskmanager.Task  - Sink: ES (5/10) (1a2951add18548188742e85d98da271f) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager: JobManager requested disconnect: JobManager is no longer the leader
        at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1073)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:314)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:121)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2018-01-11 14:50:08.486 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.runtime.taskmanager.Task  - Triggering cancellation of task code Sink: ES (5/10) (1a2951add18548188742e85d98da271f).
2018-01-11 14:50:08.506 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.runtime.taskmanager.Task  - Attempting to fail task externally Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595).
2018-01-11 14:50:08.506 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.runtime.taskmanager.Task  - Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595) switched from RUNNING to FAILED.

...

2018-01-11 14:50:08.550 [Sink: Kafka (5/10)] INFO  org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-01-11 14:50:08.553 [Sink: Kafka (5/10)] ERROR org.apache.kafka.clients.producer.KafkaProducer  - Interrupted while joining ioThread
java.lang.InterruptedException: null
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1260)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317)
        at com.intellify.flink.crusher.executor.sink.TracingSourceRecordSink.close(TracingSourceRecordSink.java:67)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

2018-01-11 14:50:08.639 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (5/10)] INFO  c.i.flink.shared.config.SharedIntellifyConfigProvider  - Closing archaius initializer
2018-01-11 14:50:08.741 [Sink: ES (5/10)] INFO  c.i.flink.shared.config.SharedIntellifyConfigProvider  - Closing archaius initializer

2018-01-11 14:50:08.873 [Sink: Kafka (5/10)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask  - Error during disposal of stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317)
        at com.intellify.flink.crusher.executor.sink.TracingSourceRecordSink.close(TracingSourceRecordSink.java:67)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: null
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1260)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031)
        ... 10 common frames omitted

2018-01-11 14:50:08.924 [flink-akka.actor.default-dispatcher-16] INFO  o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Disassociating from JobManager

2018-01-11 14:50:08.955 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (5/10)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask  - Error during disposal of stream operator.
java.lang.NoClassDefFoundError: com/intellify/flink/shaded/curator/org/apache/curator/utils/CloseableUtils
        at com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
        at com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
        at com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
        at com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
        at com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75)
        at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
        at com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56)
        at com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68)
        at com.intellify.flink.shared.redis.RedisFactory.close(RedisFactory.java:56)
        at com.intellify.flink.crusher.executor.process.DuplicateFilterFunction.close(DuplicateFilterFunction.java:90)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 16 common frames omitted

2018-01-11 14:50:08.962 [Curator-Framework-0] INFO  c.i.f.s.c.o.a.curator.framework.imps.CuratorFrameworkImpl  - backgroundOperationsLoop exiting

2018-01-11 14:50:09.068 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting down BLOB cache
2018-01-11 14:50:09.070 [Sink: Kafka (5/10)] INFO  org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem streams are closed for task Sink: Kafka (5/10) (71bebe47ce524c0d535845b1e4d9c595) [FAILED]
2018-01-11 14:50:09.070 [Sink: StagingLake (5/10)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask  - Could not shut down timer service
java.lang.InterruptedException: null
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
        at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
        at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

2018-01-11 14:50:09.146 [flink-akka.actor.default-dispatcher-16] INFO  org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down BLOB cache
2018-01-11 14:50:09.200 [flink-akka.actor.default-dispatcher-16] INFO  o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Trying to register at JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2018-01-11 14:50:09.313 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us-west-2.compute.internal:2181)] INFO  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181
2018-01-11 14:50:09.314 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us-west-2.compute.internal:2181)] INFO  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181, initiating session
2018-01-11 14:50:09.315 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us-west-2.compute.internal:2181)] WARN  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x3c00002d8a7603de for server ip-10-80-53-99.us-west-2.compute.internal/10.80.53.99:2181, unexpected error, closing socket connection and attempting reconnect
java.lang.NoClassDefFoundError: com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
Caused by: java.lang.ClassNotFoundException: com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.proto.SetWatches
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 3 common frames omitted

2018-01-11 14:50:09.329 [flink-akka.actor.default-dispatcher-15] INFO  o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Trying to register at JobManager akka.tcp://flink@10.80.54.126:31024/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO  o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Successful registration at JobManager (akka.tcp://flink@10.80.54.126:31024/user/jobmanager), starting network stack and library cache.
2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO  o.a.flink.mesos.runtime.clusterframework.MesosTaskManager  - Determined BLOB server address to be /10.80.54.126:31025. Starting BLOB cache.
2018-01-11 14:50:09.341 [flink-akka.actor.default-dispatcher-15] INFO  org.apache.flink.runtime.blob.PermanentBlobCache  - Created BLOB cache storage directory tmp/blobStore-078da52f-864b-458a-9c3b-d07f0e0a3b30
2018-01-11 14:50:09.342 [flink-akka.actor.default-dispatcher-15] INFO  org.apache.flink.runtime.blob.TransientBlobCache  - Created BLOB cache storage directory tmp/blobStore-8e5f0854-215d-4718-b13e-8f77f0f7af51

2018-01-11 14:50:10.272 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)] INFO  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181
2018-01-11 14:50:10.273 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)] INFO  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181, initiating session
2018-01-11 14:50:10.273 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)] WARN  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x3c00002d8a7603de for server ip-10-80-52-23.us-west-2.compute.internal/10.80.52.23:2181, unexpected error, closing socket connection and attempting reconnect
java.lang.NoClassDefFoundError: com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)



--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Jan 11, 2018, at 10:11 AM, Jared Stehler <ja...@intellifylearning.com> wrote:
> 
> As another data point, here’s an except from a stack dump for the task manager:
> 
> "heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (5/10)-EventThread" #94 daemon prio=5 os_prio=0 tid=0x00007f48c04d4800 nid=
> 0x68ef waiting on condition [0x00007f48470eb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000000cd6121c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>         at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)
> 
>    Locked ownable synchronizers:
>         - None
> 
> "heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-52-23.us <http://ip-10-80-52-23.us/>-west-2.compute.internal:2181)" #93 daemon prio=5 os_prio=0 tid=0x00007f48c04e1800 nid=0x68ee waiting on condition [0x00007f48471ec000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(Native Method)
>         at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1051)
> 
>    Locked ownable synchronizers:
>         - None
> 
> "Sink: ES (5/10)-EventThread" #68 daemon prio=5 os_prio=0 tid=0x00007f48c80a4800 nid=0x6829 waiting on condition [0x00007f4851e08000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000000cc4f7950> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>         at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)
> 
>    Locked ownable synchronizers:
>         - None
> 
> "Sink: ES (ip-10-80-53-99.us <http://ip-10-80-53-99.us/>-west-2.compute.internal:2181)" #67 daemon prio=5 os_prio=0 tid=0x00007f48c80aa000 nid=0x6827 runnable [0x00007f4851f09000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>         - locked <0x00000000cc4f7c18> (a sun.nio.ch.Util$3)
>         - locked <0x00000000cc4f7c08> (a java.util.Collections$UnmodifiableSet)
>         - locked <0x00000000cc4f7bc0> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>         at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)
>         at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> 
>    Locked ownable synchronizers:
>         - None
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
> 
>> On Jan 11, 2018, at 10:07 AM, Jared Stehler <jared.stehler@intellifylearning.com <ma...@intellifylearning.com>> wrote:
>> 
>> I’m seeing sporadic issues where it appears that curator (or other) user threads are left running after a stream shutdown, and then the user class loader goes away and I get spammed with ClassNotFoundExceptions… I’m wondering if this might have something to do with perhaps the UserClassLoader being shut down before close is invoked on all operators?
>> 
>> Here’s a stack trace I see from an attempt at closing an elastic search sink:
>> 
>> java.lang.ClassNotFoundException: com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>     at com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
>>     at com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
>>     at com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
>>     at com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
>>     at com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75)
>>     at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
>>     at com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56)
>>     at com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68)
>>     at com.intellify.flink.shared.elasticsearch.LiveResolvingEs1ApiCallBridge.cleanup(LiveResolvingEs1ApiCallBridge.java:105)
>>     at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:323)
>>     at com.intellify.flink.shared.tracer.TracingSink.close(TracingSink.java:50)
>>     at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>     at java.lang.Thread.run(Thread.java:748)
>> I’m using a curator connection for archaius, and closing it in the call bridge’s cleanup method. I’m ensuring that I’m not reaching up into the parent class loader by shading curator and zookeeper. 
>> 
>> I also see the following on repeat in my task manager log:
>> 
>> 2018-01-11 14:53:13.313 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us <http://ip-10-80-53-99.us/>-west-2.compute.internal:2181)] WARN  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x3c00002d8a7603de for server ip-10-80-53-99.us <http://ip-10-80-53-99.us/>-west-2.compute.internal/10.80.53.99:2181, unexpected error, closing socket connection and attempting reconnect
>> java.lang.NoClassDefFoundError: com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
>> 	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
>> 	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
>> 	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
>> 
>> 
>> Does anyone have any insight into what might be happening here? Does this seem like I’m not closing a thread properly, or something else entirely?
>> 
>> 
>> --
>> Jared Stehler
>> Chief Architect - Intellify Learning
>> o: 617.701.6330 x703
>> 
>> 
>> 
> 


Re: class loader issues when closing streams

Posted by Jared Stehler <ja...@intellifylearning.com>.
As another data point, here’s an except from a stack dump for the task manager:

"heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (5/10)-EventThread" #94 daemon prio=5 os_prio=0 tid=0x00007f48c04d4800 nid=
0x68ef waiting on condition [0x00007f48470eb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000cd6121c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)

   Locked ownable synchronizers:
        - None

"heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-52-23.us-west-2.compute.internal:2181)" #93 daemon prio=5 os_prio=0 tid=0x00007f48c04e1800 nid=0x68ee waiting on condition [0x00007f48471ec000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1051)

   Locked ownable synchronizers:
        - None

"Sink: ES (5/10)-EventThread" #68 daemon prio=5 os_prio=0 tid=0x00007f48c80a4800 nid=0x6829 waiting on condition [0x00007f4851e08000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000cc4f7950> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)

   Locked ownable synchronizers:
        - None

"Sink: ES (ip-10-80-53-99.us-west-2.compute.internal:2181)" #67 daemon prio=5 os_prio=0 tid=0x00007f48c80aa000 nid=0x6827 runnable [0x00007f4851f09000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x00000000cc4f7c18> (a sun.nio.ch.Util$3)
        - locked <0x00000000cc4f7c08> (a java.util.Collections$UnmodifiableSet)
        - locked <0x00000000cc4f7bc0> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)
        at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

   Locked ownable synchronizers:
        - None

--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703



> On Jan 11, 2018, at 10:07 AM, Jared Stehler <ja...@intellifylearning.com> wrote:
> 
> I’m seeing sporadic issues where it appears that curator (or other) user threads are left running after a stream shutdown, and then the user class loader goes away and I get spammed with ClassNotFoundExceptions… I’m wondering if this might have something to do with perhaps the UserClassLoader being shut down before close is invoked on all operators?
> 
> Here’s a stack trace I see from an attempt at closing an elastic search sink:
> 
> java.lang.ClassNotFoundException: com.intellify.flink.shaded.curator.org.apache.curator.utils.CloseableUtils
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at com.intellify.flink.shaded.curator.org.apache.curator.ConnectionState.close(ConnectionState.java:119)
>     at com.intellify.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.close(CuratorZookeeperClient.java:227)
>     at com.intellify.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.close(CuratorFrameworkImpl.java:381)
>     at com.intellify.config.ManagedCuratorFramework.reallyClose(ManagedCuratorFramework.java:48)
>     at com.intellify.config.ArchaiusInitializer.close(ArchaiusInitializer.java:75)
>     at org.apache.commons.io.IOUtils.closeQuietly(IOUtils.java:303)
>     at com.intellify.flink.shared.config.SharedIntellifyConfigProvider.close(SharedIntellifyConfigProvider.java:56)
>     at com.intellify.flink.shared.config.SerializableLiveProperty.close(SerializableLiveProperty.java:68)
>     at com.intellify.flink.shared.elasticsearch.LiveResolvingEs1ApiCallBridge.cleanup(LiveResolvingEs1ApiCallBridge.java:105)
>     at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:323)
>     at com.intellify.flink.shared.tracer.TracingSink.close(TracingSink.java:50)
>     at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)
> I’m using a curator connection for archaius, and closing it in the call bridge’s cleanup method. I’m ensuring that I’m not reaching up into the parent class loader by shading curator and zookeeper. 
> 
> I also see the following on repeat in my task manager log:
> 
> 2018-01-11 14:53:13.313 [heartbeat-filter -> input-trace-filter -> filter-inactive-ds -> filter-duplicates (ip-10-80-53-99.us <http://ip-10-80-53-99.us/>-west-2.compute.internal:2181)] WARN  c.i.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x3c00002d8a7603de for server ip-10-80-53-99.us <http://ip-10-80-53-99.us/>-west-2.compute.internal/10.80.53.99:2181, unexpected error, closing socket connection and attempting reconnect
> java.lang.NoClassDefFoundError: com/intellify/flink/shaded/zookeeper/org/apache/zookeeper/proto/SetWatches
> 	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:926)
> 	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
> 	at com.intellify.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> 
> 
> Does anyone have any insight into what might be happening here? Does this seem like I’m not closing a thread properly, or something else entirely?
> 
> 
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703
> 
> 
>