You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/24 00:13:10 UTC

[GitHub] stale[bot] closed pull request #2763: [AIRFLOW-251] added option SQL_ALCHEMY_SCHEMA parameter

stale[bot] closed pull request #2763: [AIRFLOW-251] added option SQL_ALCHEMY_SCHEMA parameter
URL: https://github.com/apache/incubator-airflow/pull/2763
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index fd78253f18..b50aab1c8f 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -63,6 +63,10 @@ executor = SequentialExecutor
 # their website
 sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db
 
+# The schema to use for the metadata database
+# SqlAlchemy supports databases with the concept of multiple schemas.
+sql_alchemy_schema =
+
 # The SqlAlchemy pool size is the maximum number of database connections
 # in the pool.
 sql_alchemy_pool_size = 5
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index b065313c17..5d62b050c1 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -31,6 +31,7 @@ base_log_folder = {AIRFLOW_HOME}/logs
 logging_level = INFO
 executor = SequentialExecutor
 sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
+sql_alchemy_schema =
 load_examples = True
 donot_pickle = False
 dag_concurrency = 16
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ff81d9827b..3b5ffa804f 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -111,6 +111,7 @@ class AirflowConfigParser(ConfigParser):
     # is to not store password on boxes in text files.
     as_command_stdout = {
         ('core', 'sql_alchemy_conn'),
+        ('core', 'sql_alchemy_schema'),
         ('core', 'fernet_key'),
         ('celery', 'broker_url'),
         ('celery', 'celery_result_backend')
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index 8d5e55e98a..a6f9fecba4 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -69,15 +69,21 @@ def run_migrations_online():
 
     """
     connectable = settings.engine
-
     with connectable.connect() as connection:
         context.configure(
             connection=connection,
             target_metadata=target_metadata,
+            version_table_schema=target_metadata.schema,
+            include_schemas=True,
             compare_type=COMPARE_TYPE,
         )
 
         with context.begin_transaction():
+            if target_metadata.schema and 'postgres' in settings.SQL_ALCHEMY_CONN:
+                context.execute('CREATE SCHEMA IF NOT EXISTS {}'.format(
+                    target_metadata.schema))
+                context.execute('SET search_path TO {}'.format(
+                    target_metadata.schema))
             context.run_migrations()
 
 if context.is_offline_mode():
diff --git a/airflow/models.py b/airflow/models.py
index 1686ea7f97..430e911fac 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -50,7 +50,7 @@
 from sqlalchemy import (
     Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType,
     Index, Float, LargeBinary)
-from sqlalchemy import func, or_, and_
+from sqlalchemy import func, or_, and_, MetaData
 from sqlalchemy.ext.declarative import declarative_base, declared_attr
 from sqlalchemy.dialects.mysql import LONGTEXT
 from sqlalchemy.orm import reconstructor, relationship, synonym
@@ -81,7 +81,12 @@
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.log.logging_mixin import LoggingMixin
 
-Base = declarative_base()
+SQL_ALCHEMY_SCHEMA = configuration.get('core', 'sql_alchemy_schema')
+if SQL_ALCHEMY_SCHEMA.strip():
+    Base = declarative_base(metadata=MetaData(schema=SQL_ALCHEMY_SCHEMA))
+else:
+    Base = declarative_base()
+
 ID_LEN = 250
 XCOM_RETURN_KEY = 'return_value'
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services