You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2018/05/25 08:53:52 UTC
[1/2] incubator-airflow git commit: [AIRFLOW-1730] Unpickle value of
XCom queried from DB
Repository: incubator-airflow
Updated Branches:
refs/heads/master c97ad4363 -> ba84b6f4a
[AIRFLOW-1730] Unpickle value of XCom queried from DB
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c6deeb2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c6deeb2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c6deeb2f
Branch: refs/heads/master
Commit: c6deeb2ff453ba50ff75c315d34110f6f4a886a5
Parents: e4e7b55
Author: Shintaro Murakami <mr...@gmail.com>
Authored: Tue Oct 17 18:40:19 2017 +0900
Committer: Shintaro Murakami <mr...@gmail.com>
Committed: Fri May 25 15:22:09 2018 +0900
----------------------------------------------------------------------
airflow/models.py | 61 +++++++++++++++++++-------------------------------
tests/models.py | 59 +++++++++++++++++++++++++++++++-----------------
2 files changed, 62 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6deeb2f/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index da18ec7..2fd05cb 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4397,6 +4397,24 @@ class XCom(Base, LoggingMixin):
Index('idx_xcom_dag_task_date', dag_id, task_id, execution_date, unique=False),
)
+ """
+ TODO: "pickling" has been deprecated and JSON is preferred.
+ "pickling" will be removed in Airflow 2.0.
+ """
+ @reconstructor
+ def init_on_load(self):
+ enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
+ if enable_pickling:
+ self.value = pickle.loads(self.value)
+ else:
+ try:
+ self.value = json.loads(self.value.decode('UTF-8'))
+ except (UnicodeEncodeError, ValueError):
+ # For backward-compatibility.
+ # Preventing errors in webserver
+ # due to XComs mixed with pickled and unpickled.
+ self.value = pickle.loads(self.value)
+
def __repr__(self):
return '<XCom "{key}" ({task_id} @ {execution_date})>'.format(
key=self.key,
@@ -4412,23 +4430,16 @@ class XCom(Base, LoggingMixin):
execution_date,
task_id,
dag_id,
- enable_pickling=None,
session=None):
"""
Store an XCom value.
- TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be
- removed in Airflow 2.0. :param enable_pickling: If pickling is not enabled, the
- XCOM value will be parsed as JSON instead.
-
+ TODO: "pickling" has been deprecated and JSON is preferred.
+ "pickling" will be removed in Airflow 2.0.
:return: None
"""
session.expunge_all()
- if enable_pickling is None:
- enable_pickling = configuration.conf.getboolean(
- 'core', 'enable_xcom_pickling'
- )
-
+ enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
value = pickle.dumps(value)
else:
@@ -4469,15 +4480,11 @@ class XCom(Base, LoggingMixin):
task_id=None,
dag_id=None,
include_prior_dates=False,
- enable_pickling=None,
session=None):
"""
Retrieve an XCom value, optionally meeting certain criteria.
TODO: "pickling" has been deprecated and JSON is preferred.
"pickling" will be removed in Airflow 2.0.
-
- :param enable_pickling: If pickling is not enabled,
- the XCOM value will be parsed to JSON instead.
:return: XCom value
"""
filters = []
@@ -4498,11 +4505,7 @@ class XCom(Base, LoggingMixin):
result = query.first()
if result:
- if enable_pickling is None:
- enable_pickling = configuration.conf.getboolean(
- 'core', 'enable_xcom_pickling'
- )
-
+ enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
return pickle.loads(result.value)
else:
@@ -4510,7 +4513,7 @@ class XCom(Base, LoggingMixin):
return json.loads(result.value.decode('UTF-8'))
except ValueError:
log = LoggingMixin().log
- log.error("Could not serialize the XCOM value into JSON. "
+ log.error("Could not deserialize the XCOM value from JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
"support for XCOM in your airflow config.")
@@ -4525,7 +4528,6 @@ class XCom(Base, LoggingMixin):
dag_ids=None,
include_prior_dates=False,
limit=100,
- enable_pickling=None,
session=None):
"""
Retrieve an XCom value, optionally meeting certain criteria
@@ -4549,23 +4551,6 @@ class XCom(Base, LoggingMixin):
.order_by(cls.execution_date.desc(), cls.timestamp.desc())
.limit(limit))
results = query.all()
- if enable_pickling is None:
- enable_pickling = configuration.conf.getboolean(
- 'core', 'enable_xcom_pickling'
- )
- for result in results:
- if enable_pickling:
- result.value = pickle.loads(result.value)
- else:
- try:
- result.value = json.loads(result.value.decode('UTF-8'))
- except ValueError:
- log = LoggingMixin().log
- log.error("Could not serialize the XCOM value into JSON. "
- "If you are using pickles instead of JSON "
- "for XCOM, then you need to enable pickle "
- "support for XCOM in your airflow config.")
- raise
return results
@classmethod
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6deeb2f/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 6d70e8b..4381e43 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -2233,24 +2233,34 @@ class ClearTasksTest(unittest.TestCase):
self.assertEqual(ti2.max_tries, 1)
def test_xcom_disable_pickle_type(self):
+ configuration.load_test_config()
+
json_obj = {"key": "value"}
execution_date = timezone.utcnow()
key = "xcom_test1"
dag_id = "test_dag1"
task_id = "test_task1"
+ configuration.set("core", "enable_xcom_pickling", "False")
+
XCom.set(key=key,
value=json_obj,
dag_id=dag_id,
task_id=task_id,
- execution_date=execution_date,
- enable_pickling=False)
+ execution_date=execution_date)
ret_value = XCom.get_one(key=key,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date,
- enable_pickling=False)
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date)
+
+ self.assertEqual(ret_value, json_obj)
+
+ session = settings.Session()
+ ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == dag_id,
+ XCom.task_id == task_id,
+ XCom.execution_date == execution_date
+ ).first().value
self.assertEqual(ret_value, json_obj)
@@ -2261,18 +2271,26 @@ class ClearTasksTest(unittest.TestCase):
dag_id = "test_dag2"
task_id = "test_task2"
+ configuration.set("core", "enable_xcom_pickling", "True")
+
XCom.set(key=key,
value=json_obj,
dag_id=dag_id,
task_id=task_id,
- execution_date=execution_date,
- enable_pickling=True)
+ execution_date=execution_date)
ret_value = XCom.get_one(key=key,
- dag_id=dag_id,
- task_id=task_id,
- execution_date=execution_date,
- enable_pickling=True)
+ dag_id=dag_id,
+ task_id=task_id,
+ execution_date=execution_date)
+
+ self.assertEqual(ret_value, json_obj)
+
+ session = settings.Session()
+ ret_value = session.query(XCom).filter(XCom.key == key, XCom.dag_id == dag_id,
+ XCom.task_id == task_id,
+ XCom.execution_date == execution_date
+ ).first().value
self.assertEqual(ret_value, json_obj)
@@ -2280,13 +2298,15 @@ class ClearTasksTest(unittest.TestCase):
class PickleRce(object):
def __reduce__(self):
return (os.system, ("ls -alt",))
+
+ configuration.set("core", "xcom_enable_pickling", "False")
+
self.assertRaises(TypeError, XCom.set,
key="xcom_test3",
value=PickleRce(),
dag_id="test_dag3",
task_id="test_task3",
- execution_date=timezone.utcnow(),
- enable_pickling=False)
+ execution_date=timezone.utcnow())
def test_xcom_get_many(self):
json_obj = {"key": "value"}
@@ -2297,23 +2317,22 @@ class ClearTasksTest(unittest.TestCase):
dag_id2 = "test_dag5"
task_id2 = "test_task5"
+ configuration.set("core", "xcom_enable_pickling", "True")
+
XCom.set(key=key,
value=json_obj,
dag_id=dag_id1,
task_id=task_id1,
- execution_date=execution_date,
- enable_pickling=True)
+ execution_date=execution_date)
XCom.set(key=key,
value=json_obj,
dag_id=dag_id2,
task_id=task_id2,
- execution_date=execution_date,
- enable_pickling=True)
+ execution_date=execution_date)
results = XCom.get_many(key=key,
- execution_date=execution_date,
- enable_pickling=True)
+ execution_date=execution_date)
for result in results:
self.assertEqual(result.value, json_obj)
[2/2] incubator-airflow git commit: Merge pull request #2701 from
mrkm4ntr/airflow-1730
Posted by as...@apache.org.
Merge pull request #2701 from mrkm4ntr/airflow-1730
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ba84b6f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ba84b6f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ba84b6f4
Branch: refs/heads/master
Commit: ba84b6f4a984f0a36c6491ddbae14fc491af31c6
Parents: c97ad43 c6deeb2
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Fri May 25 09:53:51 2018 +0100
Committer: Ash Berlin-Taylor <as...@firemirror.com>
Committed: Fri May 25 09:53:51 2018 +0100
----------------------------------------------------------------------
airflow/models.py | 61 +++++++++++++++++++-------------------------------
tests/models.py | 59 +++++++++++++++++++++++++++++++-----------------
2 files changed, 62 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ba84b6f4/airflow/models.py
----------------------------------------------------------------------