You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Andreas Fendt (JIRA)" <ji...@apache.org> on 2019/06/03 18:38:00 UTC

[jira] [Comment Edited] (PROTON-2044) Azure IoT Hub local-idle-timeout expired

    [ https://issues.apache.org/jira/browse/PROTON-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16840291#comment-16840291 ] 

Andreas Fendt edited comment on PROTON-2044 at 6/3/19 6:37 PM:
---------------------------------------------------------------

Hallo, thank you for the answer.

I have now refactored the code, so that another Thread just calls the *run()* function of *BlockingConnection*:

{code:python}
import ctypes
import inspect
import json
import threading
from asyncio import AbstractEventLoop
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from threading import Lock
from time import time
from urllib.parse import quote_plus, urlencode

from proton import ProtonException, Message
from proton.utils import BlockingConnection


def _async_raise(tid, exctype):
    """raises the exception, performs cleanup if needed"""
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")


class Thread(threading.Thread):
    def _get_my_tid(self):
        """determines this (self's) thread id"""
        if not self.isAlive():
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        raise AssertionError("could not determine the thread's id")

    def raise_exc(self, exctype):
        """raises the given exception type in the context of this thread"""
        _async_raise(self._get_my_tid(), exctype)

    def terminate(self):
        """raises SystemExit in the context of the given thread, which should
        cause the thread to exit silently (unless caught)"""
        self.raise_exc(SystemExit)


class IotHub:
    def __init__(self):
        self._hostname = f"example-hub.azure-devices.net"
        self._username = f"iothubowner@sas.root.example-hub.azure-devices.net"

        self._blocking_connection = None
        self._sender = None
        self._thread = None

        self.connect()

    @staticmethod
    def generate_sas_token(uri: str, policy: str, key: str, expiry: float = None):
        if not expiry:
            expiry = time() + 3600  # Default to 1 hour.
        encoded_uri = quote_plus(uri)
        ttl = int(expiry)
        sign_key = f"{encoded_uri}\n{ttl}"
        signature = b64encode(HMAC(b64decode(key), sign_key.encode(), sha256).digest())
        result = {"sr": uri, "sig": signature, "se": str(ttl)}
        if policy:
            result["skn"] = policy

        return f"SharedAccessSignature {urlencode(result)}"

    def connect(self):
        # create credentials
        password = self.generate_sas_token(self._hostname,
                                           "iothubowner", "key",
                                           time() + 31557600)  # ttl = 1 Year

        # establish connection
        self._blocking_connection = BlockingConnection(f"amqps://{self._hostname}", allowed_mechs="PLAIN",
                                                       user=self._username, password=password,
                                                       heartbeat=30)
        self._sender = self._blocking_connection.create_sender("/messages/devicebound")

        # keep connection active
        if self._thread and self._thread.is_alive():
            self._thread.terminate()
        self._thread = Thread(target=self.worker, daemon=True)
        self._thread.start()

    def worker(self):
        self._blocking_connection.run()

    def send(self, message: dict, serial_number: str):
        message = Message(address="/devices/{serial_number}/messages/devicebound".format(serial_number=serial_number),
                          body=bytes(json.dumps(message, separators=(",", ":")), "utf-8"))
        message.inferred = True  # disable message encoding
        self._sender.send(message, timeout=20)

{code}

-Now I have the problem, as you allready mentioned, that the Service Bus now disconnectes after about 10 - 15 minutes, which is ok and I have to live with that.-


was (Author: andreas.fendt):
Hallo, thank you for the answer.

I have now refactored the code, so that another Thread just calls the *run()* function of *BlockingConnection*:

{code:python}
import ctypes
import inspect
import json
import threading
from asyncio import AbstractEventLoop
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from threading import Lock
from time import time
from urllib.parse import quote_plus, urlencode

from proton import ProtonException, Message
from proton.utils import BlockingConnection


def _async_raise(tid, exctype):
    """raises the exception, performs cleanup if needed"""
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")


class Thread(threading.Thread):
    def _get_my_tid(self):
        """determines this (self's) thread id"""
        if not self.isAlive():
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        raise AssertionError("could not determine the thread's id")

    def raise_exc(self, exctype):
        """raises the given exception type in the context of this thread"""
        _async_raise(self._get_my_tid(), exctype)

    def terminate(self):
        """raises SystemExit in the context of the given thread, which should
        cause the thread to exit silently (unless caught)"""
        self.raise_exc(SystemExit)


class IotHub:
    def __init__(self):
        self._hostname = f"example-hub.azure-devices.net"
        self._username = f"iothubowner@sas.root.example-hub.azure-devices.net"

        self._blocking_connection = None
        self._sender = None
        self._thread = None

        self.connect()

    @staticmethod
    def generate_sas_token(uri: str, policy: str, key: str, expiry: float = None):
        if not expiry:
            expiry = time() + 3600  # Default to 1 hour.
        encoded_uri = quote_plus(uri)
        ttl = int(expiry)
        sign_key = f"{encoded_uri}\n{ttl}"
        signature = b64encode(HMAC(b64decode(key), sign_key.encode(), sha256).digest())
        result = {"sr": uri, "sig": signature, "se": str(ttl)}
        if policy:
            result["skn"] = policy

        return f"SharedAccessSignature {urlencode(result)}"

    def connect(self):
        # create credentials
        password = self.generate_sas_token(self._hostname,
                                           "iothubowner", "key",
                                           time() + 31557600)  # ttl = 1 Year

        # establish connection
        self._blocking_connection = BlockingConnection(f"amqps://{self._hostname}", allowed_mechs="PLAIN",
                                                       user=self._username, password=password,
                                                       heartbeat=30)
        self._sender = self._blocking_connection.create_sender("/messages/devicebound")

        # keep connection active
        if self._thread and self._thread.is_alive():
            self._thread.terminate()
        self._thread = Thread(target=self.worker, daemon=True)
        self._thread.start()

    def worker(self):
        self._blocking_connection.run()

    def send(self, message: dict, serial_number: str):
        message = Message(address="/devices/{serial_number}/messages/devicebound".format(serial_number=serial_number),
                          body=bytes(json.dumps(message, separators=(",", ":")), "utf-8"))
        message.inferred = True  # disable message encoding
        self._sender.send(message, timeout=20)

{code}

Now I have the problem, as you allready mentioned, that the Service Bus now disconnectes after about 10 - 15 minutes, which is ok and I have to live with that.

> Azure IoT Hub local-idle-timeout expired
> ----------------------------------------
>
>                 Key: PROTON-2044
>                 URL: https://issues.apache.org/jira/browse/PROTON-2044
>             Project: Qpid Proton
>          Issue Type: Bug
>          Components: python-binding
>    Affects Versions: proton-c-0.24.0
>         Environment: Operating System: Windows
> Python: 3.6.4
> qpid-proton: 0.24.0
>            Reporter: Andreas Fendt
>            Priority: Major
>
> I'm using following python code to send messages to the devices (*/messages/devicebound*) which are connected to the *azure iot hub*:
> {code}
> import json
> from base64 import b64encode, b64decode
> from hashlib import sha256
> from hmac import HMAC
> from time import time
> from urllib.parse import quote_plus, urlencode
> from proton import ProtonException, Message
> from proton.utils import BlockingConnection
> class IotHub:
>     def __init__(self):
>         self._hostname = f"example-hub.azure-devices.net"
>         self._username = f"iothubowner@sas.root.example-hub.azure-devices.net"
>         self._blocking_connection = None
>         self._sender = None
>         self.connect()
>     @staticmethod
>     def generate_sas_token(uri: str, policy: str, key: str, expiry: float = None):
>         if not expiry:
>             expiry = time() + 3600  # Default to 1 hour.
>         encoded_uri = quote_plus(uri)
>         ttl = int(expiry)
>         sign_key = f"{encoded_uri}\n{ttl}"
>         signature = b64encode(HMAC(b64decode(key), sign_key.encode(), sha256).digest())
>         result = {"sr": uri, "sig": signature, "se": str(ttl)}
>         if policy:
>             result["skn"] = policy
>         return f"SharedAccessSignature {urlencode(result)}"
>     def connect(self):
>         # create credentials
>         password = self.generate_sas_token(self._hostname,
>                                            "iothubowner", "key",
>                                            time() + 31557600)  # ttl = 1 Year
>         # establish connection
>         self._blocking_connection = BlockingConnection(f"amqps://{self._hostname}", allowed_mechs="PLAIN",
>                                                        user=self._username, password=password,
>                                                        heartbeat=30)
>         self._sender = self._blocking_connection.create_sender("/messages/devicebound")
>     def send(self, message: dict, serial_number: str):
>         message = Message(address="/devices/{serial_number}/messages/devicebound".format(serial_number=serial_number),
>                           body=bytes(json.dumps(message, separators=(",", ":")), "utf-8"))
>         message.inferred = True  # disable message encoding
>         self._sender.send(message, timeout=20)
> {code}
> The problem is now that when I don't send any message for some seconds I get following exepction while sending a message:
> {code:java}
> Connection amqps://example-hub.azure-devices.net:amqps disconnected: Condition('amqp:resource-limit-exceeded', 'local-idle-timeout expired')
> {code}
> Whats the reason for that? How can I solve that?
> Thank you for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org