You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Ken Giusti (JIRA)" <ji...@apache.org> on 2015/11/09 21:29:11 UTC

[jira] [Commented] (QPID-6836) qpid.messaging hangs in child forked process

    [ https://issues.apache.org/jira/browse/QPID-6836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14997293#comment-14997293 ] 

Ken Giusti commented on QPID-6836:
----------------------------------

I've looked into this a bit.  Running the reproducer, I saw two types of failures:

1) forked child hangs on a lock

This is infrequent, but does happen if the parent's thread is holding the lock at the moment the fork occurs.   The lock state (locked) is copied to the child.  Since the threads are not copied to the child, the lock is never released and the child hangs waiting on it.   This is a common problem when forking a multi-threaded library like python-qpid.

2) Connection state duplicated

This one's even harder to solve.   The design of the Connection object assumes that it's state is shared between the main thread (the application code that uses the Connection to send and receive messages) and the background I/O thread (sits on the socket, writes output data, reads input data).   For example, the main thread will write encoded data to the connection's I/O buffers, where the I/O thread will consume it and write it to the socket.   As raw data arrives on the socket, the I/O thread feeds it into the Connection's inbound buffer pool, where messages and other protocol events wait until the main thread can consume them.


But when a fork occurs, the child process basically gets its own copy of the entire state of the python-qpid library.   After the fork, there are two independent copies of all the data objects shared between the child process and the I/O thread.  The child process happily writes output data to its copy of the buffers, while the I/O thread's buffers remain unmodified.  Same in the other direction - network data is written into the I/O thread's input buffers, but the child process never sees anything arrive in its input buffers.   This leads to both processes pending for data from each other which will never arrive.

How do we fix this?   Actually, I think the question is: Can we fix this? 

The python-qpid library was never written as a fork() safe library.  There are several threads that inter-communicate using shared state.  Ensuring fork safety would require a re-write of the library.



> qpid.messaging hangs in child forked process
> --------------------------------------------
>
>                 Key: QPID-6836
>                 URL: https://issues.apache.org/jira/browse/QPID-6836
>             Project: Qpid
>          Issue Type: Bug
>          Components: Python Client
>    Affects Versions: 0.32
>            Reporter: Brian Bouterse
>
> A python process that uses qpid.messaging and then forks causes qpid.messaing use in the child process to randomly hang. To reproduce:
> 1. in a python process instantiate a qpid.messaing client and call open()
> 2. Call os.fork()
> 3. Try to instantiate another qpid.messaging client and call open()
> 4. observe the child process is waiting on a file descriptor that is shared with the parent which will never wake up.
> Here is a potential reproducer with actual python code (from kgiusti):
> {code:none}
> #!/usr/bin/env python
> #
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements.  See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership.  The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License.  You may obtain a copy of the License at
> # 
> #   http://www.apache.org/licenses/LICENSE-2.0
> # 
> # Unless required by applicable law or agreed to in writing,
> # software distributed under the License is distributed on an
> # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> # KIND, either express or implied.  See the License for the
> # specific language governing permissions and limitations
> # under the License.
> #
> import optparse, time
> from qpid.messaging import *
> from qpid.util import URL
> from qpid.log import enable, DEBUG, WARN
> import os
> import sys
> def nameval(st):
>   idx = st.find("=")
>   if idx >= 0:
>     name = st[0:idx]
>     value = st[idx+1:]
>   else:
>     name = st
>     value = None
>   return name, value
> parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",
>                                description="Send messages to the supplied address.")
> parser.add_option("-b", "--broker", default="localhost",
>                   help="connect to specified BROKER (default %default)")
> parser.add_option("-r", "--reconnect", action="store_true",
>                   help="enable auto reconnect")
> parser.add_option("-i", "--reconnect-interval", type="float", default=3,
>                   help="interval between reconnect attempts")
> parser.add_option("-l", "--reconnect-limit", type="int",
>                   help="maximum number of reconnect attempts")
> parser.add_option("-c", "--count", type="int", default=1,
>                   help="stop after count messages have been sent, zero disables (default %default)")
> parser.add_option("-d", "--durable", action="store_true",
>                   help="make the message persistent")
> parser.add_option("-t", "--timeout", type="float", default=None,
>                   help="exit after the specified time")
> parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
> parser.add_option("-S", "--subject", help="specify a subject")
> parser.add_option("-R", "--reply-to", help="specify reply-to address")
> parser.add_option("-P", "--property", dest="properties", action="append", default=[],
>                   metavar="NAME=VALUE", help="specify message property")
> parser.add_option("-M", "--map", dest="entries", action="append", default=[],
>                   metavar="KEY=VALUE",
>                   help="specify map entry for message body")
> parser.add_option("-v", dest="verbose", action="store_true",
>                   help="enable logging")
> opts, args = parser.parse_args()
> if opts.verbose:
>   enable("qpid", DEBUG)
> else:
>   enable("qpid", WARN)
> if opts.id is None:
>   spout_id = str(uuid4())
> else:
>   spout_id = opts.id
> if args:
>   addr = args.pop(0)
> else:
>   parser.error("address is required")
> content = None
> content_type = None
> if args:
>   text = " ".join(args)
> else:
>   text = None
> if opts.entries:
>   content = {}
>   if text:
>     content["text"] = text
>   for e in opts.entries:
>     name, val = nameval(e)
>     content[name] = val
> else:
>   content = text
>   # no entries were supplied, so assume text/plain for
>   # compatibility with java (and other) clients
>   content_type = "text/plain"
> import pdb
> conn = Connection(opts.broker,
>                   reconnect=opts.reconnect,
>                   reconnect_interval=opts.reconnect_interval,
>                   reconnect_limit=opts.reconnect_limit)
> conn.open()
> print("Conn open")
> pid = os.fork()
> if not pid:
>     #child
>     try:
>         #pdb.set_trace()
>         ssn = conn.session()
>         snd = ssn.sender(addr)
>         count = 0
>         start = time.time()
>         while (opts.count == 0 or count < opts.count) and \
>               (opts.timeout is None or time.time() - start < opts.timeout):
>             msg = Message(subject=opts.subject,
>                           reply_to=opts.reply_to,
>                           content=content)
>             if opts.durable:
>                 msg.durable = True
>             if content_type is not None:
>                 msg.content_type = content_type
>             msg.properties["spout-id"] = "%s:%s" % (spout_id, count)
>             for p in opts.properties:
>                 name, val = nameval(p)
>                 msg.properties[name] = val
>             snd.send(msg)
>             count += 1
>             print msg
>     except:
>         print("booboo")
>         os._exit(1)
>     print("buh-bye")
>     os._exit(0)
> else:
>     status = os.wait()[1]
>     #pdb.set_trace()
>     conn.close()
>     sys.exit(status)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org