You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/24 13:09:03 UTC

[GitHub] [airflow] malthe opened a new pull request #19806: Psrp advanced options

malthe opened a new pull request #19806:
URL: https://github.com/apache/airflow/pull/19806


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   This adds a method `invoke_cmdlet` in addition to the existing `invoke_powershell` – but it also provides a more general `invoke` method which works as a context manager such that you can build up your own command pipelines.
   
   The motivation is that often times, a session will be constrained to a set of cmdlets (e.g., "Restart-Server"), not a complete shell.
   
   Some additional docstrings has been added as well.
   
   Finally, logging has been made optional (but enabled by default).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#issuecomment-1027061362


   Merging it now (the errors are really intermittent) to be able to bump constraints and fix static checks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773676792



##########
File path: airflow/providers/microsoft/psrp/operators/psrp.py
##########
@@ -16,51 +16,129 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import List, Optional
+from typing import Any, Dict, List, Optional
+
+from jinja2.nativetypes import NativeEnvironment
+from pypsrp.serializer import TaggedValue
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.psrp.hooks.psrp import PSRPHook
+from airflow.settings import json
 
 
 class PSRPOperator(BaseOperator):
     """PowerShell Remoting Protocol operator.
 
-    :param psrp_conn_id: connection id
+    Use one of the 'command', 'cmdlet', or 'powershell' arguments.
+
+    The 'securestring' template filter can be used to tag a value for
+    serialization into a `System.Security.SecureString` (applicable only
+    for DAGs which have `render_template_as_native_obj=True`).
+
+    The command output is converted to JSON by PowerShell such that the operator
+    return value is serializable to an XCom value.
+
+    :param psrp_conn_id: Connection id
     :type psrp_conn_id: str
-    :param command: command to execute on remote host. (templated)
+    :param command: Command to execute on remote host (templated).
     :type command: str
-    :param powershell: powershell to execute on remote host. (templated)
+    :param powershell: Powershell to execute on remote host (templated)
     :type powershell: str
+    :param cmdlet:
+        Cmdlet to execute on remote host (templated). Also used as the default
+        value for `task_id`.
+    :type cmdlet: str
+    :param parameters:
+        Parameters to provide to cmdlet (templated). This is allowed only if
+        the `cmdlet` parameter is also given.
+    :type parameters: dict
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
     """
 
     template_fields = (
+        "cmdlet",
         "command",
+        "parameters",
         "powershell",
     )
     template_fields_renderers = {"command": "powershell", "powershell": "powershell"}
-    ui_color = "#901dd2"
+    ui_color = "#c2e2ff"
 
     def __init__(
         self,
         *,
         psrp_conn_id: str,
         command: Optional[str] = None,
         powershell: Optional[str] = None,
+        cmdlet: Optional[str] = None,
+        parameters: Optional[Dict[str, str]] = None,
+        logging: bool = True,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
         **kwargs,
     ) -> None:
+        args = {command, powershell, cmdlet}
+        if not len(list(filter(None, args))) == 1:
+            raise ValueError("Must provide either 'command', 'powershell', or 'cmdlet'")

Review comment:
       Fix merged into e9fe6995ffdcdd73e3d92c1c1ba2ddba8b832c5a.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: Psrp advanced options

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r756445347



##########
File path: tests/providers/microsoft/psrp/hooks/test_psrp.py
##########
@@ -29,43 +30,67 @@
 CONNECTION_ID = "conn_id"
 
 
-class TestPSRPHook(unittest.TestCase):
-    @patch(
-        f"{PSRPHook.__module__}.{PSRPHook.__name__}.get_connection",
-        return_value=Connection(
-            login='username',
-            password='password',
-            host='remote_host',
-        ),
-    )
-    @patch(f"{PSRPHook.__module__}.WSMan")
-    @patch(f"{PSRPHook.__module__}.PowerShell")
-    @patch(f"{PSRPHook.__module__}.RunspacePool")
-    @patch("logging.Logger.info")
-    def test_invoke_powershell(self, log_info, runspace_pool, powershell, ws_man, get_connection):
-        with PSRPHook(CONNECTION_ID) as hook:
-            ps = powershell.return_value = MagicMock()
-            ps.state = PSInvocationState.RUNNING
-            ps.output = []
-            ps.streams.debug = []
-            ps.streams.information = []
-            ps.streams.error = []
+def mock_powershell():

Review comment:
       I have tried to improve this now in `4ed556aff2b96f1aa35ace02dab669613ebc77a3`. Seems clearer now at least.

##########
File path: tests/providers/microsoft/psrp/hooks/test_psrp.py
##########
@@ -29,43 +30,67 @@
 CONNECTION_ID = "conn_id"
 
 
-class TestPSRPHook(unittest.TestCase):
-    @patch(
-        f"{PSRPHook.__module__}.{PSRPHook.__name__}.get_connection",
-        return_value=Connection(
-            login='username',
-            password='password',
-            host='remote_host',
-        ),
-    )
-    @patch(f"{PSRPHook.__module__}.WSMan")
-    @patch(f"{PSRPHook.__module__}.PowerShell")
-    @patch(f"{PSRPHook.__module__}.RunspacePool")
-    @patch("logging.Logger.info")
-    def test_invoke_powershell(self, log_info, runspace_pool, powershell, ws_man, get_connection):
-        with PSRPHook(CONNECTION_ID) as hook:
-            ps = powershell.return_value = MagicMock()
-            ps.state = PSInvocationState.RUNNING
-            ps.output = []
-            ps.streams.debug = []
-            ps.streams.information = []
-            ps.streams.error = []
+def mock_powershell():

Review comment:
       I have tried to improve this now in 4ed556aff2b96f1aa35ace02dab669613ebc77a3. Seems clearer now at least.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: Psrp advanced options

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r757108644



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -31,13 +33,19 @@ class PSRPHook(BaseHook):
     Hook for PowerShell Remoting Protocol execution.
 
     The hook must be used as a context manager.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool

Review comment:
       @uranusjr I looked into this in more detail now and the way it works is that there is an _operational timeout_ after which polling must return with an updated status (or a failure) – but it can return earlier.
   
   I did a test where I issued the PowerShell cmdlet `Start-Sleep -s 3; Write-Host "Hello"` (sleep for three seconds) in a loop. Even though the default operational timeout is 20 seconds, each polling action returned with a new status after 3 seconds.
   
   I have updated the code to reflect this new understanding and instead of a polling interval there is now an overrideable operational timeout. For example, setting it to 1 in the above example would mean that every second we get a status update, but only every 3 seconds do we get a "Hello" information stream record.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r796600521



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param operation_timeout: Override the default WSMan timeout when polling the pipeline.
+    :type operation_timeout: float
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
+    :param exchange_keys:
+        If true (default), automatically initiate a session key exchange when the
+        hook is used as a context manager.
+    :type exchange_keys: bool
+
+    You can provide an alternative `configuration_name` using either `runspace_options`
+    or by setting this key as the extra fields of your connection.
     """
 
-    _client = None
-    _poll_interval = 1
-
-    def __init__(self, psrp_conn_id: str):
+    _conn = None
+    _configuration_name = None
+    _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+    def __init__(
+        self,
+        psrp_conn_id: str,
+        logging: bool = True,
+        operation_timeout: Optional[float] = None,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
+        exchange_keys: bool = True,
+    ):
         self.conn_id = psrp_conn_id
+        self._logging = logging
+        self._operation_timeout = operation_timeout
+        self._runspace_options = runspace_options or {}
+        self._wsman_options = wsman_options or {}
+        self._exchange_keys = exchange_keys
 
     def __enter__(self):
-        conn = self.get_connection(self.conn_id)
-
-        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
-        self._client = WSMan(
-            conn.host,
-            ssl=True,
-            auth="ntlm",
-            encryption="never",
-            username=conn.login,
-            password=conn.password,
-            cert_validation=False,
-        )
-        self._client.__enter__()
+        conn = self.get_conn()
+        self._wsman_ref[conn].__enter__()
+        conn.__enter__()
+        if self._exchange_keys:
+            conn.exchange_keys()
+        self._conn = conn
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         try:
-            self._client.__exit__(exc_type, exc_value, traceback)
+            self._conn.__exit__(exc_type, exc_value, traceback)
+            self._wsman_ref[self._conn].__exit__(exc_type, exc_value, traceback)
         finally:
-            self._client = None
+            del self._conn

Review comment:
       I think that's actaully cool to clean-up if we can. If we had this attention everywhere, I think our memory use during tests would be quite a bit smaller (this is especially important if you'd hve parameterized tests and big test_* files. We have sometimes spikes in memory use during our tests that make Public runners to fail. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19806: Psrp advanced options

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r756624818



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -31,13 +33,19 @@ class PSRPHook(BaseHook):
     Hook for PowerShell Remoting Protocol execution.
 
     The hook must be used as a context manager.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool

Review comment:
       I wonder if we should instead make `poll_interval` configurable. `logging` is not a great name IMO since it does not show the implication of the process being polled, which I believe can be a performance impact. Allowing the user to configure `poll_interval` can make this more visible, and `poll_interval=None` (or `0` or `-1`?) would be a more natural value to disable polling altoghether.
   
   BTW, is logging still possible without polling? i.e. instead of outputting eagerly, gather all outputs and log them after `end_invoke` instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: Psrp advanced options

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r756626984



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -31,13 +33,19 @@ class PSRPHook(BaseHook):
     Hook for PowerShell Remoting Protocol execution.
 
     The hook must be used as a context manager.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool

Review comment:
       The logs are always available at the end – I don't change any of the recorded information but just keep a pointer to how far into the various streams I have forwarded to Python's logging system.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773676680



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):

Review comment:
       Fix merged into e9fe6995ffdcdd73e3d92c1c1ba2ddba8b832c5a.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#issuecomment-1019530962


   How about rebasing it @malthe ? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773546162



##########
File path: airflow/providers/microsoft/psrp/operators/psrp.py
##########
@@ -16,51 +16,129 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import List, Optional
+from typing import Any, Dict, List, Optional
+
+from jinja2.nativetypes import NativeEnvironment
+from pypsrp.serializer import TaggedValue
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.psrp.hooks.psrp import PSRPHook
+from airflow.settings import json
 
 
 class PSRPOperator(BaseOperator):
     """PowerShell Remoting Protocol operator.
 
-    :param psrp_conn_id: connection id
+    Use one of the 'command', 'cmdlet', or 'powershell' arguments.
+
+    The 'securestring' template filter can be used to tag a value for
+    serialization into a `System.Security.SecureString` (applicable only
+    for DAGs which have `render_template_as_native_obj=True`).
+
+    The command output is converted to JSON by PowerShell such that the operator
+    return value is serializable to an XCom value.
+
+    :param psrp_conn_id: Connection id
     :type psrp_conn_id: str
-    :param command: command to execute on remote host. (templated)
+    :param command: Command to execute on remote host (templated).
     :type command: str
-    :param powershell: powershell to execute on remote host. (templated)
+    :param powershell: Powershell to execute on remote host (templated)
     :type powershell: str
+    :param cmdlet:
+        Cmdlet to execute on remote host (templated). Also used as the default
+        value for `task_id`.
+    :type cmdlet: str
+    :param parameters:
+        Parameters to provide to cmdlet (templated). This is allowed only if
+        the `cmdlet` parameter is also given.
+    :type parameters: dict
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
     """
 
     template_fields = (
+        "cmdlet",
         "command",
+        "parameters",
         "powershell",
     )
     template_fields_renderers = {"command": "powershell", "powershell": "powershell"}
-    ui_color = "#901dd2"
+    ui_color = "#c2e2ff"
 
     def __init__(
         self,
         *,
         psrp_conn_id: str,
         command: Optional[str] = None,
         powershell: Optional[str] = None,
+        cmdlet: Optional[str] = None,
+        parameters: Optional[Dict[str, str]] = None,
+        logging: bool = True,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
         **kwargs,
     ) -> None:
+        args = {command, powershell, cmdlet}
+        if not len(list(filter(None, args))) == 1:
+            raise ValueError("Must provide either 'command', 'powershell', or 'cmdlet'")

Review comment:
       Can this not use the same logic as `exactly_one`?
   
   In either case you want to change `not ... ==` to `!=`. Also the message should say “exactly one” instead of “either” (which implies it’s allowed to pass multiple of them).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r796608439



##########
File path: setup.py
##########
@@ -450,7 +450,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
     pandas_requirement,
 ]
 psrp = [
-    'pypsrp~=0.5',
+    'pypsrp~=0.8',

Review comment:
       Is 0.8 really necessary here? We usully limit the versions if we know earlier versions will not work - just to avoid too "strict" limits. Is there something that prevents us to use ~0.7 here? Also how about `> 0.8` ?  We have discussion about not limiting upperbound because we do not really know the future and constraints are protecting our users, so limitng upperbound (in this case <1.0) might not be justified at all. 

##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param operation_timeout: Override the default WSMan timeout when polling the pipeline.
+    :type operation_timeout: float
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
+    :param exchange_keys:
+        If true (default), automatically initiate a session key exchange when the
+        hook is used as a context manager.
+    :type exchange_keys: bool
+
+    You can provide an alternative `configuration_name` using either `runspace_options`
+    or by setting this key as the extra fields of your connection.
     """
 
-    _client = None
-    _poll_interval = 1
-
-    def __init__(self, psrp_conn_id: str):
+    _conn = None
+    _configuration_name = None
+    _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+    def __init__(
+        self,
+        psrp_conn_id: str,
+        logging: bool = True,
+        operation_timeout: Optional[float] = None,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
+        exchange_keys: bool = True,
+    ):
         self.conn_id = psrp_conn_id
+        self._logging = logging
+        self._operation_timeout = operation_timeout
+        self._runspace_options = runspace_options or {}
+        self._wsman_options = wsman_options or {}
+        self._exchange_keys = exchange_keys
 
     def __enter__(self):
-        conn = self.get_connection(self.conn_id)
-
-        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
-        self._client = WSMan(
-            conn.host,
-            ssl=True,
-            auth="ntlm",
-            encryption="never",
-            username=conn.login,
-            password=conn.password,
-            cert_validation=False,
-        )
-        self._client.__enter__()
+        conn = self.get_conn()
+        self._wsman_ref[conn].__enter__()
+        conn.__enter__()
+        if self._exchange_keys:
+            conn.exchange_keys()
+        self._conn = conn
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         try:
-            self._client.__exit__(exc_type, exc_value, traceback)
+            self._conn.__exit__(exc_type, exc_value, traceback)
+            self._wsman_ref[self._conn].__exit__(exc_type, exc_value, traceback)
         finally:
-            self._client = None
+            del self._conn
 
-    def invoke_powershell(self, script: str) -> PowerShell:
-        with RunspacePool(self._client) as pool:
-            ps = PowerShell(pool)
-            ps.add_script(script)
+    def get_conn(self) -> RunspacePool:
+        """
+        Returns a runspace pool.
+
+        The returned object must be used as a context manager.
+        """
+        conn = self.get_connection(self.conn_id)
+        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
+
+        extra = conn.extra_dejson.copy()
+
+        def apply_extra(d, keys):
+            d = d.copy()
+            for key in keys:
+                value = extra.pop(key, None)
+                if value is not None:
+                    d[key] = value
+            return d
+
+        wsman_options = apply_extra(
+            self._wsman_options,
+            (
+                "auth",
+                "cert_validation",
+                "connection_timeout",
+                "locale",
+                "read_timeout",
+                "reconnection_retries",
+                "reconnection_backoff",
+                "ssl",
+            ),
+        )
+        wsman = WSMan(conn.host, username=conn.login, password=conn.password, **wsman_options)
+        runspace_options = apply_extra(self._runspace_options, ("configuration_name",))
+
+        if extra:
+            raise AirflowException(f"Unexpected extra configuration keys: {', '.join(sorted(extra))}")
+        pool = RunspacePool(wsman, **runspace_options)
+        self._wsman_ref[pool] = wsman
+        return pool
+
+    @contextmanager
+    def invoke(self) -> PowerShell:
+        """
+        Context manager that yields a PowerShell object to which commands can be
+        added. Upon exit, the commands will be invoked.
+        """
+        local_context = self._conn is None
+        if local_context:
+            self.__enter__()
+        try:
+            ps = PowerShell(self._conn)
+            yield ps
             ps.begin_invoke()
-            streams = [
-                (ps.output, self._log_output),
-                (ps.streams.debug, self._log_record),
-                (ps.streams.information, self._log_record),
-                (ps.streams.error, self._log_record),
-            ]
-            offsets = [0 for _ in streams]
-
-            # We're using polling to make sure output and streams are
-            # handled while the process is running.
-            while ps.state == PSInvocationState.RUNNING:
-                sleep(self._poll_interval)
-                ps.poll_invoke()
-
-                for (i, (stream, handler)) in enumerate(streams):
-                    offset = offsets[i]
-                    while len(stream) > offset:
-                        handler(stream[offset])
-                        offset += 1
-                    offsets[i] = offset
+            if self._logging:
+                streams = [
+                    (ps.streams.debug, self._log_record),
+                    (ps.streams.error, self._log_record),
+                    (ps.streams.information, self._log_record),
+                    (ps.streams.progress, self._log_record),
+                    (ps.streams.verbose, self._log_record),
+                    (ps.streams.warning, self._log_record),
+                ]
+                offsets = [0 for _ in streams]
+
+                # We're using polling to make sure output and streams are
+                # handled while the process is running.
+                while ps.state == PSInvocationState.RUNNING:
+                    ps.poll_invoke(timeout=self._operation_timeout)
+
+                    for (i, (stream, handler)) in enumerate(streams):
+                        offset = offsets[i]
+                        while len(stream) > offset:
+                            handler(stream[offset])
+                            offset += 1
+                        offsets[i] = offset
 
             # For good measure, we'll make sure the process has
-            # stopped running.
+            # stopped running in any case.
             ps.end_invoke()
 
+            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
             if ps.streams.error:
                 raise AirflowException("Process had one or more errors")
+        finally:
+            if local_context:
+                self.__exit__(None, None, None)
 
-            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
-            return ps
+    def invoke_cmdlet(self, name: str, use_local_scope=None, **parameters: Dict[str, str]) -> PowerShell:
+        """Invoke a PowerShell cmdlet and return session."""
+        with self.invoke() as ps:
+            ps.add_cmdlet(name, use_local_scope=use_local_scope)
+            ps.add_parameters(parameters)
+        return ps
 
-    def _log_output(self, message: str):
-        self.log.info("%s", message)
+    def invoke_powershell(self, script: str) -> PowerShell:
+        """Invoke a PowerShell script and return session."""
+        with self.invoke() as ps:
+            ps.add_script(script)
+        return ps
 
     def _log_record(self, record):
-        # TODO: Consider translating some or all of these records into
-        # normal logging levels, using `log(level, msg, *args)`.
-        if isinstance(record, ErrorRecord):
-            self.log.info("Error: %s", record)
-            return
-
-        if isinstance(record, InformationRecord):
-            self.log.info("Information: %s", record.message_data)
-            return
-
-        if isinstance(record, ProgressRecord):
+        message_type = getattr(record, "MESSAGE_TYPE", None)
+
+        # There seems to be a problem with some record types; we'll assume
+        # that the class name matches a message type.
+        if message_type is None:
+            message_type = getattr(
+                MessageType, re.sub('(?!^)([A-Z]+)', r'_\1', type(record).__name__).upper()
+            )
+
+        if message_type == MessageType.ERROR_RECORD:
+            self.log.info("%s: %s", record.reason, record)
+            if record.script_stacktrace:
+                for trace in record.script_stacktrace.split('\r\n'):

Review comment:
       I think using `splitlines()` is indeed better, at least it raises a brow for me - why not splitlines()?  
   
   The PSRP <> Windows association is not at all obvious for someone who just reviews the code)?. 
   I tihnk it should either explain it in the comment why we are doing it, or (better IMHO) use splitlines and forget about it .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe closed pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe closed pull request #19806:
URL: https://github.com/apache/airflow/pull/19806


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#issuecomment-1022052944


   @potiuk rebased and ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #19806:
URL: https://github.com/apache/airflow/pull/19806


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #19806:
URL: https://github.com/apache/airflow/pull/19806


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19806: Psrp advanced options

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r756083794



##########
File path: tests/providers/microsoft/psrp/hooks/test_psrp.py
##########
@@ -29,43 +30,67 @@
 CONNECTION_ID = "conn_id"
 
 
-class TestPSRPHook(unittest.TestCase):
-    @patch(
-        f"{PSRPHook.__module__}.{PSRPHook.__name__}.get_connection",
-        return_value=Connection(
-            login='username',
-            password='password',
-            host='remote_host',
-        ),
-    )
-    @patch(f"{PSRPHook.__module__}.WSMan")
-    @patch(f"{PSRPHook.__module__}.PowerShell")
-    @patch(f"{PSRPHook.__module__}.RunspacePool")
-    @patch("logging.Logger.info")
-    def test_invoke_powershell(self, log_info, runspace_pool, powershell, ws_man, get_connection):
-        with PSRPHook(CONNECTION_ID) as hook:
-            ps = powershell.return_value = MagicMock()
-            ps.state = PSInvocationState.RUNNING
-            ps.output = []
-            ps.streams.debug = []
-            ps.streams.information = []
-            ps.streams.error = []
+def mock_powershell():

Review comment:
       FWIW you can subclass `Mock` (or `MagicMock`. This could make this a bit more readable?

##########
File path: tests/providers/microsoft/psrp/hooks/test_psrp.py
##########
@@ -29,43 +30,67 @@
 CONNECTION_ID = "conn_id"
 
 
-class TestPSRPHook(unittest.TestCase):
-    @patch(
-        f"{PSRPHook.__module__}.{PSRPHook.__name__}.get_connection",
-        return_value=Connection(
-            login='username',
-            password='password',
-            host='remote_host',
-        ),
-    )
-    @patch(f"{PSRPHook.__module__}.WSMan")
-    @patch(f"{PSRPHook.__module__}.PowerShell")
-    @patch(f"{PSRPHook.__module__}.RunspacePool")
-    @patch("logging.Logger.info")
-    def test_invoke_powershell(self, log_info, runspace_pool, powershell, ws_man, get_connection):
-        with PSRPHook(CONNECTION_ID) as hook:
-            ps = powershell.return_value = MagicMock()
-            ps.state = PSInvocationState.RUNNING
-            ps.output = []
-            ps.streams.debug = []
-            ps.streams.information = []
-            ps.streams.error = []
+def mock_powershell():

Review comment:
       FWIW you can subclass `Mock` or `MagicMock`. This could make this a bit more readable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773539930



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.

Review comment:
       How important is it to you to support the non-context manager use case? I feel it’s kind of unnecessarily complicating the implementation.

##########
File path: airflow/providers/microsoft/psrp/operators/psrp.py
##########
@@ -16,51 +16,129 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import List, Optional
+from typing import Any, Dict, List, Optional
+
+from jinja2.nativetypes import NativeEnvironment
+from pypsrp.serializer import TaggedValue
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.microsoft.psrp.hooks.psrp import PSRPHook
+from airflow.settings import json
 
 
 class PSRPOperator(BaseOperator):
     """PowerShell Remoting Protocol operator.
 
-    :param psrp_conn_id: connection id
+    Use one of the 'command', 'cmdlet', or 'powershell' arguments.
+
+    The 'securestring' template filter can be used to tag a value for
+    serialization into a `System.Security.SecureString` (applicable only
+    for DAGs which have `render_template_as_native_obj=True`).
+
+    The command output is converted to JSON by PowerShell such that the operator
+    return value is serializable to an XCom value.
+
+    :param psrp_conn_id: Connection id
     :type psrp_conn_id: str
-    :param command: command to execute on remote host. (templated)
+    :param command: Command to execute on remote host (templated).
     :type command: str
-    :param powershell: powershell to execute on remote host. (templated)
+    :param powershell: Powershell to execute on remote host (templated)
     :type powershell: str
+    :param cmdlet:
+        Cmdlet to execute on remote host (templated). Also used as the default
+        value for `task_id`.
+    :type cmdlet: str
+    :param parameters:
+        Parameters to provide to cmdlet (templated). This is allowed only if
+        the `cmdlet` parameter is also given.
+    :type parameters: dict
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
     """
 
     template_fields = (
+        "cmdlet",
         "command",
+        "parameters",
         "powershell",
     )
     template_fields_renderers = {"command": "powershell", "powershell": "powershell"}
-    ui_color = "#901dd2"
+    ui_color = "#c2e2ff"
 
     def __init__(
         self,
         *,
         psrp_conn_id: str,
         command: Optional[str] = None,
         powershell: Optional[str] = None,
+        cmdlet: Optional[str] = None,
+        parameters: Optional[Dict[str, str]] = None,
+        logging: bool = True,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
         **kwargs,
     ) -> None:
+        args = {command, powershell, cmdlet}
+        if not len(list(filter(None, args))) == 1:
+            raise ValueError("Must provide either 'command', 'powershell', or 'cmdlet'")

Review comment:
       Can this not use the same logic as `exactly_one`?
   
   In either case you want to change `not ... ==` to `!=`. Also the message should say “exaxtly one” instead of “either” (which implies it’s allowed to pass multiple of them).

##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param operation_timeout: Override the default WSMan timeout when polling the pipeline.
+    :type operation_timeout: float
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
+    :param exchange_keys:
+        If true (default), automatically initiate a session key exchange when the
+        hook is used as a context manager.
+    :type exchange_keys: bool
+
+    You can provide an alternative `configuration_name` using either `runspace_options`
+    or by setting this key as the extra fields of your connection.
     """
 
-    _client = None
-    _poll_interval = 1
-
-    def __init__(self, psrp_conn_id: str):
+    _conn = None
+    _configuration_name = None
+    _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+    def __init__(
+        self,
+        psrp_conn_id: str,
+        logging: bool = True,
+        operation_timeout: Optional[float] = None,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
+        exchange_keys: bool = True,
+    ):
         self.conn_id = psrp_conn_id
+        self._logging = logging
+        self._operation_timeout = operation_timeout
+        self._runspace_options = runspace_options or {}
+        self._wsman_options = wsman_options or {}
+        self._exchange_keys = exchange_keys
 
     def __enter__(self):
-        conn = self.get_connection(self.conn_id)
-
-        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
-        self._client = WSMan(
-            conn.host,
-            ssl=True,
-            auth="ntlm",
-            encryption="never",
-            username=conn.login,
-            password=conn.password,
-            cert_validation=False,
-        )
-        self._client.__enter__()
+        conn = self.get_conn()
+        self._wsman_ref[conn].__enter__()
+        conn.__enter__()
+        if self._exchange_keys:
+            conn.exchange_keys()
+        self._conn = conn
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         try:
-            self._client.__exit__(exc_type, exc_value, traceback)
+            self._conn.__exit__(exc_type, exc_value, traceback)
+            self._wsman_ref[self._conn].__exit__(exc_type, exc_value, traceback)
         finally:
-            self._client = None
+            del self._conn
 
-    def invoke_powershell(self, script: str) -> PowerShell:
-        with RunspacePool(self._client) as pool:
-            ps = PowerShell(pool)
-            ps.add_script(script)
+    def get_conn(self) -> RunspacePool:
+        """
+        Returns a runspace pool.
+
+        The returned object must be used as a context manager.
+        """
+        conn = self.get_connection(self.conn_id)
+        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
+
+        extra = conn.extra_dejson.copy()
+
+        def apply_extra(d, keys):
+            d = d.copy()
+            for key in keys:
+                value = extra.pop(key, None)
+                if value is not None:
+                    d[key] = value
+            return d
+
+        wsman_options = apply_extra(
+            self._wsman_options,
+            (
+                "auth",
+                "cert_validation",
+                "connection_timeout",
+                "locale",
+                "read_timeout",
+                "reconnection_retries",
+                "reconnection_backoff",
+                "ssl",
+            ),
+        )
+        wsman = WSMan(conn.host, username=conn.login, password=conn.password, **wsman_options)
+        runspace_options = apply_extra(self._runspace_options, ("configuration_name",))
+
+        if extra:
+            raise AirflowException(f"Unexpected extra configuration keys: {', '.join(sorted(extra))}")
+        pool = RunspacePool(wsman, **runspace_options)
+        self._wsman_ref[pool] = wsman
+        return pool
+
+    @contextmanager
+    def invoke(self) -> PowerShell:
+        """
+        Context manager that yields a PowerShell object to which commands can be
+        added. Upon exit, the commands will be invoked.
+        """
+        local_context = self._conn is None
+        if local_context:
+            self.__enter__()
+        try:
+            ps = PowerShell(self._conn)
+            yield ps
             ps.begin_invoke()
-            streams = [
-                (ps.output, self._log_output),
-                (ps.streams.debug, self._log_record),
-                (ps.streams.information, self._log_record),
-                (ps.streams.error, self._log_record),
-            ]
-            offsets = [0 for _ in streams]
-
-            # We're using polling to make sure output and streams are
-            # handled while the process is running.
-            while ps.state == PSInvocationState.RUNNING:
-                sleep(self._poll_interval)
-                ps.poll_invoke()
-
-                for (i, (stream, handler)) in enumerate(streams):
-                    offset = offsets[i]
-                    while len(stream) > offset:
-                        handler(stream[offset])
-                        offset += 1
-                    offsets[i] = offset
+            if self._logging:
+                streams = [
+                    (ps.streams.debug, self._log_record),
+                    (ps.streams.error, self._log_record),
+                    (ps.streams.information, self._log_record),
+                    (ps.streams.progress, self._log_record),
+                    (ps.streams.verbose, self._log_record),
+                    (ps.streams.warning, self._log_record),
+                ]
+                offsets = [0 for _ in streams]
+
+                # We're using polling to make sure output and streams are
+                # handled while the process is running.
+                while ps.state == PSInvocationState.RUNNING:
+                    ps.poll_invoke(timeout=self._operation_timeout)
+
+                    for (i, (stream, handler)) in enumerate(streams):
+                        offset = offsets[i]
+                        while len(stream) > offset:
+                            handler(stream[offset])
+                            offset += 1
+                        offsets[i] = offset
 
             # For good measure, we'll make sure the process has
-            # stopped running.
+            # stopped running in any case.
             ps.end_invoke()
 
+            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
             if ps.streams.error:
                 raise AirflowException("Process had one or more errors")
+        finally:
+            if local_context:
+                self.__exit__(None, None, None)
 
-            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
-            return ps
+    def invoke_cmdlet(self, name: str, use_local_scope=None, **parameters: Dict[str, str]) -> PowerShell:
+        """Invoke a PowerShell cmdlet and return session."""
+        with self.invoke() as ps:
+            ps.add_cmdlet(name, use_local_scope=use_local_scope)
+            ps.add_parameters(parameters)
+        return ps
 
-    def _log_output(self, message: str):
-        self.log.info("%s", message)
+    def invoke_powershell(self, script: str) -> PowerShell:
+        """Invoke a PowerShell script and return session."""
+        with self.invoke() as ps:
+            ps.add_script(script)
+        return ps
 
     def _log_record(self, record):
-        # TODO: Consider translating some or all of these records into
-        # normal logging levels, using `log(level, msg, *args)`.
-        if isinstance(record, ErrorRecord):
-            self.log.info("Error: %s", record)
-            return
-
-        if isinstance(record, InformationRecord):
-            self.log.info("Information: %s", record.message_data)
-            return
-
-        if isinstance(record, ProgressRecord):
+        message_type = getattr(record, "MESSAGE_TYPE", None)
+
+        # There seems to be a problem with some record types; we'll assume
+        # that the class name matches a message type.
+        if message_type is None:
+            message_type = getattr(
+                MessageType, re.sub('(?!^)([A-Z]+)', r'_\1', type(record).__name__).upper()
+            )
+
+        if message_type == MessageType.ERROR_RECORD:
+            self.log.info("%s: %s", record.reason, record)
+            if record.script_stacktrace:
+                for trace in record.script_stacktrace.split('\r\n'):

Review comment:
       `str.splitlines()`

##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param operation_timeout: Override the default WSMan timeout when polling the pipeline.
+    :type operation_timeout: float
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
+    :param exchange_keys:
+        If true (default), automatically initiate a session key exchange when the
+        hook is used as a context manager.
+    :type exchange_keys: bool
+
+    You can provide an alternative `configuration_name` using either `runspace_options`
+    or by setting this key as the extra fields of your connection.
     """
 
-    _client = None
-    _poll_interval = 1
-
-    def __init__(self, psrp_conn_id: str):
+    _conn = None
+    _configuration_name = None
+    _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+    def __init__(
+        self,
+        psrp_conn_id: str,
+        logging: bool = True,
+        operation_timeout: Optional[float] = None,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
+        exchange_keys: bool = True,
+    ):
         self.conn_id = psrp_conn_id
+        self._logging = logging
+        self._operation_timeout = operation_timeout
+        self._runspace_options = runspace_options or {}
+        self._wsman_options = wsman_options or {}
+        self._exchange_keys = exchange_keys
 
     def __enter__(self):
-        conn = self.get_connection(self.conn_id)
-
-        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
-        self._client = WSMan(
-            conn.host,
-            ssl=True,
-            auth="ntlm",
-            encryption="never",
-            username=conn.login,
-            password=conn.password,
-            cert_validation=False,
-        )
-        self._client.__enter__()
+        conn = self.get_conn()
+        self._wsman_ref[conn].__enter__()
+        conn.__enter__()
+        if self._exchange_keys:
+            conn.exchange_keys()
+        self._conn = conn
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         try:
-            self._client.__exit__(exc_type, exc_value, traceback)
+            self._conn.__exit__(exc_type, exc_value, traceback)
+            self._wsman_ref[self._conn].__exit__(exc_type, exc_value, traceback)
         finally:
-            self._client = None
+            del self._conn
 
-    def invoke_powershell(self, script: str) -> PowerShell:
-        with RunspacePool(self._client) as pool:
-            ps = PowerShell(pool)
-            ps.add_script(script)
+    def get_conn(self) -> RunspacePool:
+        """
+        Returns a runspace pool.
+
+        The returned object must be used as a context manager.
+        """
+        conn = self.get_connection(self.conn_id)
+        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
+
+        extra = conn.extra_dejson.copy()
+
+        def apply_extra(d, keys):
+            d = d.copy()
+            for key in keys:
+                value = extra.pop(key, None)
+                if value is not None:
+                    d[key] = value
+            return d
+
+        wsman_options = apply_extra(
+            self._wsman_options,
+            (
+                "auth",
+                "cert_validation",
+                "connection_timeout",
+                "locale",
+                "read_timeout",
+                "reconnection_retries",
+                "reconnection_backoff",
+                "ssl",
+            ),
+        )
+        wsman = WSMan(conn.host, username=conn.login, password=conn.password, **wsman_options)
+        runspace_options = apply_extra(self._runspace_options, ("configuration_name",))
+
+        if extra:
+            raise AirflowException(f"Unexpected extra configuration keys: {', '.join(sorted(extra))}")
+        pool = RunspacePool(wsman, **runspace_options)
+        self._wsman_ref[pool] = wsman
+        return pool
+
+    @contextmanager
+    def invoke(self) -> PowerShell:
+        """
+        Context manager that yields a PowerShell object to which commands can be
+        added. Upon exit, the commands will be invoked.
+        """
+        local_context = self._conn is None
+        if local_context:
+            self.__enter__()
+        try:
+            ps = PowerShell(self._conn)
+            yield ps
             ps.begin_invoke()
-            streams = [
-                (ps.output, self._log_output),
-                (ps.streams.debug, self._log_record),
-                (ps.streams.information, self._log_record),
-                (ps.streams.error, self._log_record),
-            ]
-            offsets = [0 for _ in streams]
-
-            # We're using polling to make sure output and streams are
-            # handled while the process is running.
-            while ps.state == PSInvocationState.RUNNING:
-                sleep(self._poll_interval)
-                ps.poll_invoke()
-
-                for (i, (stream, handler)) in enumerate(streams):
-                    offset = offsets[i]
-                    while len(stream) > offset:
-                        handler(stream[offset])
-                        offset += 1
-                    offsets[i] = offset
+            if self._logging:
+                streams = [
+                    (ps.streams.debug, self._log_record),
+                    (ps.streams.error, self._log_record),
+                    (ps.streams.information, self._log_record),
+                    (ps.streams.progress, self._log_record),
+                    (ps.streams.verbose, self._log_record),
+                    (ps.streams.warning, self._log_record),
+                ]
+                offsets = [0 for _ in streams]
+
+                # We're using polling to make sure output and streams are
+                # handled while the process is running.
+                while ps.state == PSInvocationState.RUNNING:
+                    ps.poll_invoke(timeout=self._operation_timeout)
+
+                    for (i, (stream, handler)) in enumerate(streams):
+                        offset = offsets[i]
+                        while len(stream) > offset:
+                            handler(stream[offset])
+                            offset += 1
+                        offsets[i] = offset
 
             # For good measure, we'll make sure the process has
-            # stopped running.
+            # stopped running in any case.
             ps.end_invoke()
 
+            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
             if ps.streams.error:
                 raise AirflowException("Process had one or more errors")
+        finally:
+            if local_context:
+                self.__exit__(None, None, None)
 
-            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
-            return ps
+    def invoke_cmdlet(self, name: str, use_local_scope=None, **parameters: Dict[str, str]) -> PowerShell:
+        """Invoke a PowerShell cmdlet and return session."""
+        with self.invoke() as ps:
+            ps.add_cmdlet(name, use_local_scope=use_local_scope)
+            ps.add_parameters(parameters)
+        return ps
 
-    def _log_output(self, message: str):
-        self.log.info("%s", message)
+    def invoke_powershell(self, script: str) -> PowerShell:
+        """Invoke a PowerShell script and return session."""
+        with self.invoke() as ps:
+            ps.add_script(script)
+        return ps
 
     def _log_record(self, record):
-        # TODO: Consider translating some or all of these records into
-        # normal logging levels, using `log(level, msg, *args)`.
-        if isinstance(record, ErrorRecord):
-            self.log.info("Error: %s", record)
-            return
-
-        if isinstance(record, InformationRecord):
-            self.log.info("Information: %s", record.message_data)
-            return
-
-        if isinstance(record, ProgressRecord):
+        message_type = getattr(record, "MESSAGE_TYPE", None)
+
+        # There seems to be a problem with some record types; we'll assume
+        # that the class name matches a message type.
+        if message_type is None:
+            message_type = getattr(
+                MessageType, re.sub('(?!^)([A-Z]+)', r'_\1', type(record).__name__).upper()
+            )

Review comment:
       Should we provide some sort of fallback when even this fails? I’m not that confident on this class name—message type relation.

##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):

Review comment:
       Hmm we’ve been recently renaming things in the AWS provider, maybe we should also take the change to rename this to `PsrpHook` to fit the naming converntion (operator as well).

##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param operation_timeout: Override the default WSMan timeout when polling the pipeline.
+    :type operation_timeout: float
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
+    :param exchange_keys:
+        If true (default), automatically initiate a session key exchange when the
+        hook is used as a context manager.
+    :type exchange_keys: bool
+
+    You can provide an alternative `configuration_name` using either `runspace_options`
+    or by setting this key as the extra fields of your connection.
     """
 
-    _client = None
-    _poll_interval = 1
-
-    def __init__(self, psrp_conn_id: str):
+    _conn = None
+    _configuration_name = None
+    _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+    def __init__(
+        self,
+        psrp_conn_id: str,
+        logging: bool = True,
+        operation_timeout: Optional[float] = None,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
+        exchange_keys: bool = True,
+    ):
         self.conn_id = psrp_conn_id
+        self._logging = logging
+        self._operation_timeout = operation_timeout
+        self._runspace_options = runspace_options or {}
+        self._wsman_options = wsman_options or {}
+        self._exchange_keys = exchange_keys
 
     def __enter__(self):
-        conn = self.get_connection(self.conn_id)
-
-        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
-        self._client = WSMan(
-            conn.host,
-            ssl=True,
-            auth="ntlm",
-            encryption="never",
-            username=conn.login,
-            password=conn.password,
-            cert_validation=False,
-        )
-        self._client.__enter__()
+        conn = self.get_conn()
+        self._wsman_ref[conn].__enter__()
+        conn.__enter__()
+        if self._exchange_keys:
+            conn.exchange_keys()
+        self._conn = conn
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         try:
-            self._client.__exit__(exc_type, exc_value, traceback)
+            self._conn.__exit__(exc_type, exc_value, traceback)
+            self._wsman_ref[self._conn].__exit__(exc_type, exc_value, traceback)
         finally:
-            self._client = None
+            del self._conn

Review comment:
       ```suggestion
               self._conn = None
   ```
   
   It feels unnecessarily complicated to delete the instance-bound variable. An extra entry in `__dict__` is not that big a deal. This also prevents the class-bound `_conn` gets accidentally deleted if `__exit__` is called without `__enter__` (not that it’s likely in any way, but it’s a risk we don’t need to take in the first place).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773661323



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param operation_timeout: Override the default WSMan timeout when polling the pipeline.
+    :type operation_timeout: float
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
+    :param exchange_keys:
+        If true (default), automatically initiate a session key exchange when the
+        hook is used as a context manager.
+    :type exchange_keys: bool
+
+    You can provide an alternative `configuration_name` using either `runspace_options`
+    or by setting this key as the extra fields of your connection.
     """
 
-    _client = None
-    _poll_interval = 1
-
-    def __init__(self, psrp_conn_id: str):
+    _conn = None
+    _configuration_name = None
+    _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+    def __init__(
+        self,
+        psrp_conn_id: str,
+        logging: bool = True,
+        operation_timeout: Optional[float] = None,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
+        exchange_keys: bool = True,
+    ):
         self.conn_id = psrp_conn_id
+        self._logging = logging
+        self._operation_timeout = operation_timeout
+        self._runspace_options = runspace_options or {}
+        self._wsman_options = wsman_options or {}
+        self._exchange_keys = exchange_keys
 
     def __enter__(self):
-        conn = self.get_connection(self.conn_id)
-
-        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
-        self._client = WSMan(
-            conn.host,
-            ssl=True,
-            auth="ntlm",
-            encryption="never",
-            username=conn.login,
-            password=conn.password,
-            cert_validation=False,
-        )
-        self._client.__enter__()
+        conn = self.get_conn()
+        self._wsman_ref[conn].__enter__()
+        conn.__enter__()
+        if self._exchange_keys:
+            conn.exchange_keys()
+        self._conn = conn
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         try:
-            self._client.__exit__(exc_type, exc_value, traceback)
+            self._conn.__exit__(exc_type, exc_value, traceback)
+            self._wsman_ref[self._conn].__exit__(exc_type, exc_value, traceback)
         finally:
-            self._client = None
+            del self._conn

Review comment:
       Just a small correction, you can't delete a class variable from an instance – that is not possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773666740



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):

Review comment:
       Or `PowerShellRemotingHook` – something like that might help users who don't know that this is what they should use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#issuecomment-1027061504


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #19806:
URL: https://github.com/apache/airflow/pull/19806


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#issuecomment-994975325


   @uranusjr so this became a bigger increment in the functionality of this operator than first anticipated.
   
   The design has been verified to work in practice with real-world requirements.
   
   In order to support the new `securestring` template filter, code has been contributed to https://github.com/jborean93/pypsrp/pull/128 which has subsequently seen a new release 0.7.0 that includes the necessary support (which is why the version requirement has been bumped).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773663469



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param operation_timeout: Override the default WSMan timeout when polling the pipeline.
+    :type operation_timeout: float
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
+    :param exchange_keys:
+        If true (default), automatically initiate a session key exchange when the
+        hook is used as a context manager.
+    :type exchange_keys: bool
+
+    You can provide an alternative `configuration_name` using either `runspace_options`
+    or by setting this key as the extra fields of your connection.
     """
 
-    _client = None
-    _poll_interval = 1
-
-    def __init__(self, psrp_conn_id: str):
+    _conn = None
+    _configuration_name = None
+    _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+    def __init__(
+        self,
+        psrp_conn_id: str,
+        logging: bool = True,
+        operation_timeout: Optional[float] = None,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
+        exchange_keys: bool = True,
+    ):
         self.conn_id = psrp_conn_id
+        self._logging = logging
+        self._operation_timeout = operation_timeout
+        self._runspace_options = runspace_options or {}
+        self._wsman_options = wsman_options or {}
+        self._exchange_keys = exchange_keys
 
     def __enter__(self):
-        conn = self.get_connection(self.conn_id)
-
-        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
-        self._client = WSMan(
-            conn.host,
-            ssl=True,
-            auth="ntlm",
-            encryption="never",
-            username=conn.login,
-            password=conn.password,
-            cert_validation=False,
-        )
-        self._client.__enter__()
+        conn = self.get_conn()
+        self._wsman_ref[conn].__enter__()
+        conn.__enter__()
+        if self._exchange_keys:
+            conn.exchange_keys()
+        self._conn = conn
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         try:
-            self._client.__exit__(exc_type, exc_value, traceback)
+            self._conn.__exit__(exc_type, exc_value, traceback)
+            self._wsman_ref[self._conn].__exit__(exc_type, exc_value, traceback)
         finally:
-            self._client = None
+            del self._conn
 
-    def invoke_powershell(self, script: str) -> PowerShell:
-        with RunspacePool(self._client) as pool:
-            ps = PowerShell(pool)
-            ps.add_script(script)
+    def get_conn(self) -> RunspacePool:
+        """
+        Returns a runspace pool.
+
+        The returned object must be used as a context manager.
+        """
+        conn = self.get_connection(self.conn_id)
+        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
+
+        extra = conn.extra_dejson.copy()
+
+        def apply_extra(d, keys):
+            d = d.copy()
+            for key in keys:
+                value = extra.pop(key, None)
+                if value is not None:
+                    d[key] = value
+            return d
+
+        wsman_options = apply_extra(
+            self._wsman_options,
+            (
+                "auth",
+                "cert_validation",
+                "connection_timeout",
+                "locale",
+                "read_timeout",
+                "reconnection_retries",
+                "reconnection_backoff",
+                "ssl",
+            ),
+        )
+        wsman = WSMan(conn.host, username=conn.login, password=conn.password, **wsman_options)
+        runspace_options = apply_extra(self._runspace_options, ("configuration_name",))
+
+        if extra:
+            raise AirflowException(f"Unexpected extra configuration keys: {', '.join(sorted(extra))}")
+        pool = RunspacePool(wsman, **runspace_options)
+        self._wsman_ref[pool] = wsman
+        return pool
+
+    @contextmanager
+    def invoke(self) -> PowerShell:
+        """
+        Context manager that yields a PowerShell object to which commands can be
+        added. Upon exit, the commands will be invoked.
+        """
+        local_context = self._conn is None
+        if local_context:
+            self.__enter__()
+        try:
+            ps = PowerShell(self._conn)
+            yield ps
             ps.begin_invoke()
-            streams = [
-                (ps.output, self._log_output),
-                (ps.streams.debug, self._log_record),
-                (ps.streams.information, self._log_record),
-                (ps.streams.error, self._log_record),
-            ]
-            offsets = [0 for _ in streams]
-
-            # We're using polling to make sure output and streams are
-            # handled while the process is running.
-            while ps.state == PSInvocationState.RUNNING:
-                sleep(self._poll_interval)
-                ps.poll_invoke()
-
-                for (i, (stream, handler)) in enumerate(streams):
-                    offset = offsets[i]
-                    while len(stream) > offset:
-                        handler(stream[offset])
-                        offset += 1
-                    offsets[i] = offset
+            if self._logging:
+                streams = [
+                    (ps.streams.debug, self._log_record),
+                    (ps.streams.error, self._log_record),
+                    (ps.streams.information, self._log_record),
+                    (ps.streams.progress, self._log_record),
+                    (ps.streams.verbose, self._log_record),
+                    (ps.streams.warning, self._log_record),
+                ]
+                offsets = [0 for _ in streams]
+
+                # We're using polling to make sure output and streams are
+                # handled while the process is running.
+                while ps.state == PSInvocationState.RUNNING:
+                    ps.poll_invoke(timeout=self._operation_timeout)
+
+                    for (i, (stream, handler)) in enumerate(streams):
+                        offset = offsets[i]
+                        while len(stream) > offset:
+                            handler(stream[offset])
+                            offset += 1
+                        offsets[i] = offset
 
             # For good measure, we'll make sure the process has
-            # stopped running.
+            # stopped running in any case.
             ps.end_invoke()
 
+            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
             if ps.streams.error:
                 raise AirflowException("Process had one or more errors")
+        finally:
+            if local_context:
+                self.__exit__(None, None, None)
 
-            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
-            return ps
+    def invoke_cmdlet(self, name: str, use_local_scope=None, **parameters: Dict[str, str]) -> PowerShell:
+        """Invoke a PowerShell cmdlet and return session."""
+        with self.invoke() as ps:
+            ps.add_cmdlet(name, use_local_scope=use_local_scope)
+            ps.add_parameters(parameters)
+        return ps
 
-    def _log_output(self, message: str):
-        self.log.info("%s", message)
+    def invoke_powershell(self, script: str) -> PowerShell:
+        """Invoke a PowerShell script and return session."""
+        with self.invoke() as ps:
+            ps.add_script(script)
+        return ps
 
     def _log_record(self, record):
-        # TODO: Consider translating some or all of these records into
-        # normal logging levels, using `log(level, msg, *args)`.
-        if isinstance(record, ErrorRecord):
-            self.log.info("Error: %s", record)
-            return
-
-        if isinstance(record, InformationRecord):
-            self.log.info("Information: %s", record.message_data)
-            return
-
-        if isinstance(record, ProgressRecord):
+        message_type = getattr(record, "MESSAGE_TYPE", None)
+
+        # There seems to be a problem with some record types; we'll assume
+        # that the class name matches a message type.
+        if message_type is None:
+            message_type = getattr(
+                MessageType, re.sub('(?!^)([A-Z]+)', r'_\1', type(record).__name__).upper()
+            )
+
+        if message_type == MessageType.ERROR_RECORD:
+            self.log.info("%s: %s", record.reason, record)
+            if record.script_stacktrace:
+                for trace in record.script_stacktrace.split('\r\n'):

Review comment:
       Actually, I think "\r\n" is the correct thing here – it is Windows after all, so we don't really need the bigger arsenal of line-endings in `str.splitlines()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773662117



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.
+
+    :param psrp_conn_id: Required. The name of the PSRP connection.
+    :type psrp_conn_id: str
+    :param logging: If true (default), log command output and streams during execution.
+    :type logging: bool
+    :param operation_timeout: Override the default WSMan timeout when polling the pipeline.
+    :type operation_timeout: float
+    :param runspace_options:
+        Optional dictionary which is passed when creating the runspace pool. See
+        :py:class:`~pypsrp.powershell.RunspacePool` for a description of the
+        available options.
+    :type runspace_options: dict
+    :param wsman_options:
+        Optional dictionary which is passed when creating the `WSMan` client. See
+        :py:class:`~pypsrp.wsman.WSMan` for a description of the available options.
+    :type wsman_options: dict
+    :param exchange_keys:
+        If true (default), automatically initiate a session key exchange when the
+        hook is used as a context manager.
+    :type exchange_keys: bool
+
+    You can provide an alternative `configuration_name` using either `runspace_options`
+    or by setting this key as the extra fields of your connection.
     """
 
-    _client = None
-    _poll_interval = 1
-
-    def __init__(self, psrp_conn_id: str):
+    _conn = None
+    _configuration_name = None
+    _wsman_ref: "WeakKeyDictionary[RunspacePool, WSMan]" = WeakKeyDictionary()
+
+    def __init__(
+        self,
+        psrp_conn_id: str,
+        logging: bool = True,
+        operation_timeout: Optional[float] = None,
+        runspace_options: Optional[Dict[str, Any]] = None,
+        wsman_options: Optional[Dict[str, Any]] = None,
+        exchange_keys: bool = True,
+    ):
         self.conn_id = psrp_conn_id
+        self._logging = logging
+        self._operation_timeout = operation_timeout
+        self._runspace_options = runspace_options or {}
+        self._wsman_options = wsman_options or {}
+        self._exchange_keys = exchange_keys
 
     def __enter__(self):
-        conn = self.get_connection(self.conn_id)
-
-        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
-        self._client = WSMan(
-            conn.host,
-            ssl=True,
-            auth="ntlm",
-            encryption="never",
-            username=conn.login,
-            password=conn.password,
-            cert_validation=False,
-        )
-        self._client.__enter__()
+        conn = self.get_conn()
+        self._wsman_ref[conn].__enter__()
+        conn.__enter__()
+        if self._exchange_keys:
+            conn.exchange_keys()
+        self._conn = conn
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         try:
-            self._client.__exit__(exc_type, exc_value, traceback)
+            self._conn.__exit__(exc_type, exc_value, traceback)
+            self._wsman_ref[self._conn].__exit__(exc_type, exc_value, traceback)
         finally:
-            self._client = None
+            del self._conn
 
-    def invoke_powershell(self, script: str) -> PowerShell:
-        with RunspacePool(self._client) as pool:
-            ps = PowerShell(pool)
-            ps.add_script(script)
+    def get_conn(self) -> RunspacePool:
+        """
+        Returns a runspace pool.
+
+        The returned object must be used as a context manager.
+        """
+        conn = self.get_connection(self.conn_id)
+        self.log.info("Establishing WinRM connection %s to host: %s", self.conn_id, conn.host)
+
+        extra = conn.extra_dejson.copy()
+
+        def apply_extra(d, keys):
+            d = d.copy()
+            for key in keys:
+                value = extra.pop(key, None)
+                if value is not None:
+                    d[key] = value
+            return d
+
+        wsman_options = apply_extra(
+            self._wsman_options,
+            (
+                "auth",
+                "cert_validation",
+                "connection_timeout",
+                "locale",
+                "read_timeout",
+                "reconnection_retries",
+                "reconnection_backoff",
+                "ssl",
+            ),
+        )
+        wsman = WSMan(conn.host, username=conn.login, password=conn.password, **wsman_options)
+        runspace_options = apply_extra(self._runspace_options, ("configuration_name",))
+
+        if extra:
+            raise AirflowException(f"Unexpected extra configuration keys: {', '.join(sorted(extra))}")
+        pool = RunspacePool(wsman, **runspace_options)
+        self._wsman_ref[pool] = wsman
+        return pool
+
+    @contextmanager
+    def invoke(self) -> PowerShell:
+        """
+        Context manager that yields a PowerShell object to which commands can be
+        added. Upon exit, the commands will be invoked.
+        """
+        local_context = self._conn is None
+        if local_context:
+            self.__enter__()
+        try:
+            ps = PowerShell(self._conn)
+            yield ps
             ps.begin_invoke()
-            streams = [
-                (ps.output, self._log_output),
-                (ps.streams.debug, self._log_record),
-                (ps.streams.information, self._log_record),
-                (ps.streams.error, self._log_record),
-            ]
-            offsets = [0 for _ in streams]
-
-            # We're using polling to make sure output and streams are
-            # handled while the process is running.
-            while ps.state == PSInvocationState.RUNNING:
-                sleep(self._poll_interval)
-                ps.poll_invoke()
-
-                for (i, (stream, handler)) in enumerate(streams):
-                    offset = offsets[i]
-                    while len(stream) > offset:
-                        handler(stream[offset])
-                        offset += 1
-                    offsets[i] = offset
+            if self._logging:
+                streams = [
+                    (ps.streams.debug, self._log_record),
+                    (ps.streams.error, self._log_record),
+                    (ps.streams.information, self._log_record),
+                    (ps.streams.progress, self._log_record),
+                    (ps.streams.verbose, self._log_record),
+                    (ps.streams.warning, self._log_record),
+                ]
+                offsets = [0 for _ in streams]
+
+                # We're using polling to make sure output and streams are
+                # handled while the process is running.
+                while ps.state == PSInvocationState.RUNNING:
+                    ps.poll_invoke(timeout=self._operation_timeout)
+
+                    for (i, (stream, handler)) in enumerate(streams):
+                        offset = offsets[i]
+                        while len(stream) > offset:
+                            handler(stream[offset])
+                            offset += 1
+                        offsets[i] = offset
 
             # For good measure, we'll make sure the process has
-            # stopped running.
+            # stopped running in any case.
             ps.end_invoke()
 
+            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
             if ps.streams.error:
                 raise AirflowException("Process had one or more errors")
+        finally:
+            if local_context:
+                self.__exit__(None, None, None)
 
-            self.log.info("Invocation state: %s", str(PSInvocationState(ps.state)))
-            return ps
+    def invoke_cmdlet(self, name: str, use_local_scope=None, **parameters: Dict[str, str]) -> PowerShell:
+        """Invoke a PowerShell cmdlet and return session."""
+        with self.invoke() as ps:
+            ps.add_cmdlet(name, use_local_scope=use_local_scope)
+            ps.add_parameters(parameters)
+        return ps
 
-    def _log_output(self, message: str):
-        self.log.info("%s", message)
+    def invoke_powershell(self, script: str) -> PowerShell:
+        """Invoke a PowerShell script and return session."""
+        with self.invoke() as ps:
+            ps.add_script(script)
+        return ps
 
     def _log_record(self, record):
-        # TODO: Consider translating some or all of these records into
-        # normal logging levels, using `log(level, msg, *args)`.
-        if isinstance(record, ErrorRecord):
-            self.log.info("Error: %s", record)
-            return
-
-        if isinstance(record, InformationRecord):
-            self.log.info("Information: %s", record.message_data)
-            return
-
-        if isinstance(record, ProgressRecord):
+        message_type = getattr(record, "MESSAGE_TYPE", None)
+
+        # There seems to be a problem with some record types; we'll assume
+        # that the class name matches a message type.
+        if message_type is None:
+            message_type = getattr(
+                MessageType, re.sub('(?!^)([A-Z]+)', r'_\1', type(record).__name__).upper()
+            )

Review comment:
       I was able to get a fix for this merged into the upstream package: https://github.com/jborean93/pypsrp/pull/131.
   
   (But no release yet.) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: PSRP improvements

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r773660930



##########
File path: airflow/providers/microsoft/psrp/hooks/psrp.py
##########
@@ -16,103 +16,222 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from time import sleep
+import re
+from contextlib import contextmanager
+from logging import DEBUG, ERROR, INFO, WARNING
+from typing import Any, Dict, Optional
+from weakref import WeakKeyDictionary
 
-from pypsrp.messages import ErrorRecord, InformationRecord, ProgressRecord
+from pypsrp.messages import MessageType
 from pypsrp.powershell import PowerShell, PSInvocationState, RunspacePool
 from pypsrp.wsman import WSMan
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
+INFORMATIONAL_RECORD_LEVEL_MAP = {
+    MessageType.DEBUG_RECORD: DEBUG,
+    MessageType.ERROR_RECORD: ERROR,
+    MessageType.VERBOSE_RECORD: INFO,
+    MessageType.WARNING_RECORD: WARNING,
+}
+
 
 class PSRPHook(BaseHook):
     """
     Hook for PowerShell Remoting Protocol execution.
 
-    The hook must be used as a context manager.
+    When used as a context manager, the runspace pool is reused between shell
+    sessions.

Review comment:
       Someone less comfortable with context managers will presumably have less of a challenge using the hook, which is one argument for having it. And I suppose it allows for a one-liner in simple cases.
   
   But I don't feel strongly that we need to support the non-context manager use case. It does complicate the code a little bit – but not a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #19806: Psrp advanced options

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #19806:
URL: https://github.com/apache/airflow/pull/19806#discussion_r756436789



##########
File path: tests/providers/microsoft/psrp/hooks/test_psrp.py
##########
@@ -29,43 +30,67 @@
 CONNECTION_ID = "conn_id"
 
 
-class TestPSRPHook(unittest.TestCase):
-    @patch(
-        f"{PSRPHook.__module__}.{PSRPHook.__name__}.get_connection",
-        return_value=Connection(
-            login='username',
-            password='password',
-            host='remote_host',
-        ),
-    )
-    @patch(f"{PSRPHook.__module__}.WSMan")
-    @patch(f"{PSRPHook.__module__}.PowerShell")
-    @patch(f"{PSRPHook.__module__}.RunspacePool")
-    @patch("logging.Logger.info")
-    def test_invoke_powershell(self, log_info, runspace_pool, powershell, ws_man, get_connection):
-        with PSRPHook(CONNECTION_ID) as hook:
-            ps = powershell.return_value = MagicMock()
-            ps.state = PSInvocationState.RUNNING
-            ps.output = []
-            ps.streams.debug = []
-            ps.streams.information = []
-            ps.streams.error = []
+def mock_powershell():

Review comment:
       I could change it to something like:
   ```python
   class MockPowerShellFactory(MagicMock):
       def __init__(self, *args, **kwargs):
           super().__init__(*args, **kwargs)
           ps = self.return_value = MagicMock()
           ps.state = PSInvocationState.NOT_STARTED
   
           def poll_invoke():
               ps.output.append("<output>")
               ps.streams.debug.append(MagicMock(spec=InformationRecord, message_data="<message>"))
               ps.state = PSInvocationState.COMPLETED
   
           def begin_invoke():
               ps.state = PSInvocationState.RUNNING
               ps.output = []
               ps.streams.debug = []
               ps.streams.information = []
               ps.streams.error = []
   
           def end_invoke():
               while ps.state == PSInvocationState.RUNNING:
                   poll_invoke()
               ps.streams.error = []
   
           ps.poll_invoke.side_effect = poll_invoke
           ps.begin_invoke.side_effect = begin_invoke
           ps.end_invoke.side_effect = end_invoke
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org