You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2018/05/25 08:53:52 UTC

[1/2] incubator-airflow git commit: [AIRFLOW-1730] Unpickle value of XCom queried from DB

Repository: incubator-airflow
Updated Branches:
  refs/heads/master c97ad4363 -> ba84b6f4a


[AIRFLOW-1730] Unpickle value of XCom queried from DB


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c6deeb2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c6deeb2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c6deeb2f

Branch: refs/heads/master
Commit: c6deeb2ff453ba50ff75c315d34110f6f4a886a5
Parents: e4e7b55
Author: Shintaro Murakami <mr...@gmail.com>
Authored: Tue Oct 17 18:40:19 2017 +0900
Committer: Shintaro Murakami <mr...@gmail.com>
Committed: Fri May 25 15:22:09 2018 +0900

----------------------------------------------------------------------
 airflow/models.py | 61 +++++++++++++++++++-------------------------------
 tests/models.py   | 59 +++++++++++++++++++++++++++++++-----------------
 2 files changed, 62 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6deeb2f/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index da18ec7..2fd05cb 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4397,6 +4397,24 @@ class XCom(Base, LoggingMixin):
         Index('idx_xcom_dag_task_date', dag_id, task_id, execution_date, unique=False),
     )
 
+    """
+    TODO: "pickling" has been deprecated and JSON is preferred.
+          "pickling" will be removed in Airflow 2.0.
+    """
+    @reconstructor
+    def init_on_load(self):
+        enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
+        if enable_pickling:
+            self.value = pickle.loads(self.value)
+        else:
+            try:
+                self.value = json.loads(self.value.decode('UTF-8'))
+            except (UnicodeEncodeError, ValueError):
+                # For backward-compatibility.
+                # Preventing errors in webserver
+                # due to XComs mixed with pickled and unpickled.
+                self.value = pickle.loads(self.value)
+
     def __repr__(self):
         return '<XCom "{key}" ({task_id} @ {execution_date})>'.format(
             key=self.key,
@@ -4412,23 +4430,16 @@ class XCom(Base, LoggingMixin):
             execution_date,
             task_id,
             dag_id,
-            enable_pickling=None,
             session=None):
         """
         Store an XCom value.
-        TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be
-        removed in Airflow 2.0. :param enable_pickling: If pickling is not enabled, the
-        XCOM value will be parsed as JSON instead.
-
+        TODO: "pickling" has been deprecated and JSON is preferred.
+              "pickling" will be removed in Airflow 2.0.
         :return: None
         """
         session.expunge_all()
 
-        if enable_pickling is None:
-            enable_pickling = configuration.conf.getboolean(
-                'core', 'enable_xcom_pickling'
-            )
-
+        enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
         if enable_pickling:
             value = pickle.dumps(value)
         else:
@@ -4469,15 +4480,11 @@ class XCom(Base, LoggingMixin):
                 task_id=None,
                 dag_id=None,
                 include_prior_dates=False,
-                enable_pickling=None,
                 session=None):
         """
         Retrieve an XCom value, optionally meeting certain criteria.
         TODO: "pickling" has been deprecated and JSON is preferred.
               "pickling" will be removed in Airflow 2.0.
-
-        :param enable_pickling: If pickling is not enabled,
-                                the XCOM value will be parsed to JSON instead.
         :return: XCom value
         """
         filters = []
@@ -4498,11 +4505,7 @@ class XCom(Base, LoggingMixin):
 
         result = query.first()
         if result:
-            if enable_pickling is None:
-                enable_pickling = configuration.conf.getboolean(
-                    'core', 'enable_xcom_pickling'
-                )
-
+            enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
             if enable_pickling:
                 return pickle.loads(result.value)
             else:
@@ -4510,7 +4513,7 @@ class XCom(Base, LoggingMixin):
                     return json.loads(result.value.decode('UTF-8'))
                 except ValueError:
                     log = LoggingMixin().log
-                    log.error("Could not serialize the XCOM value into JSON. "
+                    log.error("Could not deserialize the XCOM value from JSON. "
                               "If you are using pickles instead of JSON "
                               "for XCOM, then you need to enable pickle "
                               "support for XCOM in your airflow config.")
@@ -4525,7 +4528,6 @@ class XCom(Base, LoggingMixin):
                  dag_ids=None,
                  include_prior_dates=False,
                  limit=100,
-                 enable_pickling=None,
                  session=None):
         """
         Retrieve an XCom value, optionally meeting certain criteria
@@ -4549,23 +4551,6 @@ class XCom(Base, LoggingMixin):
                               .order_by(cls.execution_date.desc(), cls.timestamp.desc())
                               .limit(limit))
         results = query.all()
-        if enable_pickling is None:
-            enable_pickling = configuration.conf.getboolean(
-                'core', 'enable_xcom_pickling'
-            )
-        for result in results:
-            if enable_pickling:
-                result.value = pickle.loads(result.value)
-            else:
-                try:
-                    result.value = json.loads(result.value.decode('UTF-8'))
-                except ValueError:
-                    log = LoggingMixin().log
-                    log.error("Could not serialize the XCOM value into JSON. "
-                              "If you are using pickles instead of JSON "
-                              "for XCOM, then you need to enable pickle "
-                              "support for XCOM in your airflow config.")
-                    raise
         return results
 
     @classmethod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6deeb2f/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 6d70e8b..4381e43 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -2233,24 +2233,34 @@ class ClearTasksTest(unittest.TestCase):
         self.assertEqual(ti2.max_tries, 1)
 
     def test_xcom_disable_pickle_type(self):
+        configuration.load_test_config()
+
         json_obj = {"key": "value"}
         execution_date = timezone.utcnow()
         key = "xcom_test1"
         dag_id = "test_dag1"
         task_id = "test_task1"
 
+        configuration.set("core", "enable_xcom_pickling", "False")
+
         XCom.set(key=key,
                  value=json_obj,
                  dag_id=dag_id,
                  task_id=task_id,
-                 execution_date=execution_date,
-                 enable_pickling=False)
+                 execution_date=execution_date)
 
         ret_value = XCom.get_one(key=key,
-                 dag_id=dag_id,
-                 task_id=task_id,
-                 execution_date=execution_date,
-                 enable_pickling=False)
+                                 dag_id=dag_id,
+                                 task_id=task_id,
+                                 execution_date=execution_date)
+
+        self.assertEqual(ret_value, json_obj)
+
+        session = settings.Session()
+        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == dag_id,
+                                               XCom.task_id == task_id,
+                                               XCom.execution_date == execution_date
+                                               ).first().value
 
         self.assertEqual(ret_value, json_obj)
 
@@ -2261,18 +2271,26 @@ class ClearTasksTest(unittest.TestCase):
         dag_id = "test_dag2"
         task_id = "test_task2"
 
+        configuration.set("core", "enable_xcom_pickling", "True")
+
         XCom.set(key=key,
                  value=json_obj,
                  dag_id=dag_id,
                  task_id=task_id,
-                 execution_date=execution_date,
-                 enable_pickling=True)
+                 execution_date=execution_date)
 
         ret_value = XCom.get_one(key=key,
-                 dag_id=dag_id,
-                 task_id=task_id,
-                 execution_date=execution_date,
-                 enable_pickling=True)
+                                 dag_id=dag_id,
+                                 task_id=task_id,
+                                 execution_date=execution_date)
+
+        self.assertEqual(ret_value, json_obj)
+
+        session = settings.Session()
+        ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == dag_id,
+                                               XCom.task_id == task_id,
+                                               XCom.execution_date == execution_date
+                                               ).first().value
 
         self.assertEqual(ret_value, json_obj)
 
@@ -2280,13 +2298,15 @@ class ClearTasksTest(unittest.TestCase):
         class PickleRce(object):
             def __reduce__(self):
                 return (os.system, ("ls -alt",))
+
+        configuration.set("core", "xcom_enable_pickling", "False")
+
         self.assertRaises(TypeError, XCom.set,
                           key="xcom_test3",
                           value=PickleRce(),
                           dag_id="test_dag3",
                           task_id="test_task3",
-                          execution_date=timezone.utcnow(),
-                          enable_pickling=False)
+                          execution_date=timezone.utcnow())
 
     def test_xcom_get_many(self):
         json_obj = {"key": "value"}
@@ -2297,23 +2317,22 @@ class ClearTasksTest(unittest.TestCase):
         dag_id2 = "test_dag5"
         task_id2 = "test_task5"
 
+        configuration.set("core", "xcom_enable_pickling", "True")
+
         XCom.set(key=key,
                  value=json_obj,
                  dag_id=dag_id1,
                  task_id=task_id1,
-                 execution_date=execution_date,
-                 enable_pickling=True)
+                 execution_date=execution_date)
 
         XCom.set(key=key,
                  value=json_obj,
                  dag_id=dag_id2,
                  task_id=task_id2,
-                 execution_date=execution_date,
-                 enable_pickling=True)
+                 execution_date=execution_date)
 
         results = XCom.get_many(key=key,
-                                execution_date=execution_date,
-                                enable_pickling=True)
+                                execution_date=execution_date)
 
         for result in results:
             self.assertEqual(result.value, json_obj)


[2/2] incubator-airflow git commit: Merge pull request #2701 from mrkm4ntr/airflow-1730

Posted by as...@apache.org.
Merge pull request #2701 from mrkm4ntr/airflow-1730


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ba84b6f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ba84b6f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ba84b6f4

Branch: refs/heads/master
Commit: ba84b6f4a984f0a36c6491ddbae14fc491af31c6
Parents: c97ad43 c6deeb2
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Fri May 25 09:53:51 2018 +0100
Committer: Ash Berlin-Taylor <as...@firemirror.com>
Committed: Fri May 25 09:53:51 2018 +0100

----------------------------------------------------------------------
 airflow/models.py | 61 +++++++++++++++++++-------------------------------
 tests/models.py   | 59 +++++++++++++++++++++++++++++++-----------------
 2 files changed, 62 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ba84b6f4/airflow/models.py
----------------------------------------------------------------------