You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/10/19 16:28:38 UTC
incubator-airflow git commit: [AIRFLOW-1330] Add conn_type argument
to CLI when adding connection
Repository: incubator-airflow
Updated Branches:
refs/heads/master b464d23a6 -> 2f107d8a3
[AIRFLOW-1330] Add conn_type argument to CLI when adding connection
Closes #2525 from mrkm4ntr/airflow-1330
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2f107d8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2f107d8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2f107d8a
Branch: refs/heads/master
Commit: 2f107d8a30910fd025774004d5c4c95407ed55c5
Parents: b464d23
Author: Shintaro Murakami <mr...@gmail.com>
Authored: Thu Oct 19 09:28:09 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Thu Oct 19 09:28:23 2017 -0700
----------------------------------------------------------------------
airflow/bin/cli.py | 64 ++++++++++++++++++++++++++++++++++++++++++-------
tests/core.py | 51 ++++++++++++++++++++++++++++++---------
2 files changed, 95 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f107d8a/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index a24ed5e..3943aeb 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -40,6 +40,7 @@ import traceback
import time
import psutil
import re
+from urllib.parse import urlunparse
import airflow
from airflow import api
@@ -931,11 +932,15 @@ def version(args): # noqa
print(settings.HEADER + " v" + airflow.__version__)
+alternative_conn_specs = ['conn_type', 'conn_host',
+ 'conn_login', 'conn_password', 'conn_schema', 'conn_port']
+
+
def connections(args):
if args.list:
# Check that no other flags were passed to the command
invalid_args = list()
- for arg in ['conn_id', 'conn_uri', 'conn_extra']:
+ for arg in ['conn_id', 'conn_uri', 'conn_extra'] + alternative_conn_specs:
if getattr(args, arg) is not None:
invalid_args.append(arg)
if invalid_args:
@@ -960,7 +965,7 @@ def connections(args):
if args.delete:
# Check that only the `conn_id` arg was passed to the command
invalid_args = list()
- for arg in ['conn_uri', 'conn_extra']:
+ for arg in ['conn_uri', 'conn_extra'] + alternative_conn_specs:
if getattr(args, arg) is not None:
invalid_args.append(arg)
if invalid_args:
@@ -1004,16 +1009,32 @@ def connections(args):
if args.add:
# Check that the conn_id and conn_uri args were passed to the command:
missing_args = list()
- for arg in ['conn_id', 'conn_uri']:
- if getattr(args, arg) is None:
- missing_args.append(arg)
+ invalid_args = list()
+ if not args.conn_id:
+ missing_args.append('conn_id')
+ if args.conn_uri:
+ for arg in alternative_conn_specs:
+ if getattr(args, arg) is not None:
+ invalid_args.append(arg)
+ elif not args.conn_type:
+ missing_args.append('conn_uri or conn_type')
if missing_args:
msg = ('\n\tThe following args are required to add a connection:' +
' {missing!r}\n'.format(missing=missing_args))
print(msg)
+ if invalid_args:
+ msg = ('\n\tThe following args are not compatible with the ' +
+ '--add flag and --conn_uri flag: {invalid!r}\n')
+ msg = msg.format(invalid=invalid_args)
+ print(msg)
+ if missing_args or invalid_args:
return
- new_conn = Connection(conn_id=args.conn_id, uri=args.conn_uri)
+ if args.conn_uri:
+ new_conn = Connection(conn_id=args.conn_id, uri=args.conn_uri)
+ else:
+ new_conn = Connection(conn_id=args.conn_id, conn_type=args.conn_type, host=args.conn_host,
+ login=args.conn_login, password=args.conn_password, schema=args.conn_schema, port=args.conn_port)
if args.conn_extra is not None:
new_conn.set_extra(args.conn_extra)
@@ -1024,7 +1045,8 @@ def connections(args):
session.add(new_conn)
session.commit()
msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
- msg = msg.format(conn_id=new_conn.conn_id, uri=args.conn_uri)
+ msg = msg.format(conn_id=new_conn.conn_id, uri=args.conn_uri or urlunparse((args.conn_type, '{login}:{password}@{host}:{port}'.format(
+ login=args.conn_login or '', password=args.conn_password or '', host=args.conn_host or '', port=args.conn_port or ''), args.conn_schema or '', '', '', '')))
print(msg)
else:
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
@@ -1420,7 +1442,31 @@ class CLIFactory(object):
type=str),
'conn_uri': Arg(
('--conn_uri',),
- help='Connection URI, required to add a connection',
+ help='Connection URI, required to add a connection without conn_type',
+ type=str),
+ 'conn_type': Arg(
+ ('--conn_type',),
+ help='Connection type, required to add a connection without conn_uri',
+ type=str),
+ 'conn_host': Arg(
+ ('--conn_host',),
+ help='Connection host, optional when adding a connection',
+ type=str),
+ 'conn_login': Arg(
+ ('--conn_login',),
+ help='Connection login, optional when adding a connection',
+ type=str),
+ 'conn_password': Arg(
+ ('--conn_password',),
+ help='Connection password, optional when adding a connection',
+ type=str),
+ 'conn_schema': Arg(
+ ('--conn_schema',),
+ help='Connection schema, optional when adding a connection',
+ type=str),
+ 'conn_port': Arg(
+ ('--conn_port',),
+ help='Connection port, optional when adding a connection',
type=str),
'conn_extra': Arg(
('--conn_extra',),
@@ -1558,7 +1604,7 @@ class CLIFactory(object):
'func': connections,
'help': "List/Add/Delete connections",
'args': ('list_connections', 'add_connection', 'delete_connection',
- 'conn_id', 'conn_uri', 'conn_extra'),
+ 'conn_id', 'conn_uri', 'conn_extra') + tuple(alternative_conn_specs),
},
)
subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f107d8a/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 0c94137..513ed45 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1136,15 +1136,17 @@ class CliTests(unittest.TestCase):
with mock.patch('sys.stdout',
new_callable=six.StringIO) as mock_stdout:
cli.connections(self.parser.parse_args(
- ['connections', '--list', '--conn_id=fake',
- '--conn_uri=fake-uri']))
+ ['connections', '--list', '--conn_id=fake', '--conn_uri=fake-uri',
+ '--conn_type=fake-type', '--conn_host=fake_host',
+ '--conn_login=fake_login', '--conn_password=fake_password',
+ '--conn_schema=fake_schema', '--conn_port=fake_port', '--conn_extra=fake_extra']))
stdout = mock_stdout.getvalue()
# Check list attempt stdout
lines = [l for l in stdout.split('\n') if len(l) > 0]
self.assertListEqual(lines, [
("\tThe following args are not compatible with the " +
- "--list flag: ['conn_id', 'conn_uri']"),
+ "--list flag: ['conn_id', 'conn_uri', 'conn_extra', 'conn_type', 'conn_host', 'conn_login', 'conn_password', 'conn_schema', 'conn_port']"),
])
def test_cli_connections_add_delete(self):
@@ -1164,6 +1166,14 @@ class CliTests(unittest.TestCase):
cli.connections(self.parser.parse_args(
['connections', '-a', '--conn_id=new4',
'--conn_uri=%s' % uri, '--conn_extra', "{'extra': 'yes'}"]))
+ cli.connections(self.parser.parse_args(
+ ['connections', '--add', '--conn_id=new5',
+ '--conn_type=hive_metastore', '--conn_login=airflow',
+ '--conn_password=airflow', '--conn_host=host',
+ '--conn_port=9083', '--conn_schema=airflow']))
+ cli.connections(self.parser.parse_args(
+ ['connections', '-a', '--conn_id=new6',
+ '--conn_uri', "", '--conn_type=google_cloud_platform', '--conn_extra', "{'extra': 'yes'}"]))
stdout = mock_stdout.getvalue()
# Check addition stdout
@@ -1177,6 +1187,10 @@ class CliTests(unittest.TestCase):
"postgresql://airflow:airflow@host:5432/airflow"),
("\tSuccessfully added `conn_id`=new4 : " +
"postgresql://airflow:airflow@host:5432/airflow"),
+ ("\tSuccessfully added `conn_id`=new5 : " +
+ "hive_metastore://airflow:airflow@host:9083/airflow"),
+ ("\tSuccessfully added `conn_id`=new6 : " +
+ "google_cloud_platform://:@:")
])
# Attempt to add duplicate
@@ -1218,7 +1232,7 @@ class CliTests(unittest.TestCase):
lines = [l for l in stdout.split('\n') if len(l) > 0]
self.assertListEqual(lines, [
("\tThe following args are required to add a connection:" +
- " ['conn_uri']"),
+ " ['conn_uri or conn_type']"),
])
# Prepare to add connections
@@ -1229,15 +1243,23 @@ class CliTests(unittest.TestCase):
'new4': "{'extra': 'yes'}"}
# Add connections
- for conn_id in ['new1', 'new2', 'new3', 'new4']:
+ for index in range(1, 6):
+ conn_id = 'new%s' % index
result = (session
.query(models.Connection)
.filter(models.Connection.conn_id == conn_id)
.first())
result = (result.conn_id, result.conn_type, result.host,
result.port, result.get_extra())
- self.assertEqual(result, (conn_id, 'postgres', 'host', 5432,
- extra[conn_id]))
+ if conn_id in ['new1', 'new2', 'new3', 'new4']:
+ self.assertEqual(result, (conn_id, 'postgres', 'host', 5432,
+ extra[conn_id]))
+ elif conn_id == 'new5':
+ self.assertEqual(result, (conn_id, 'hive_metastore', 'host',
+ 9083, None))
+ elif conn_id == 'new6':
+ self.assertEqual(result, (conn_id, 'google_cloud_platform',
+ None, None, "{'extra': 'yes'}"))
# Delete connections
with mock.patch('sys.stdout',
@@ -1250,6 +1272,10 @@ class CliTests(unittest.TestCase):
['connections', '--delete', '--conn_id=new3']))
cli.connections(self.parser.parse_args(
['connections', '--delete', '--conn_id=new4']))
+ cli.connections(self.parser.parse_args(
+ ['connections', '--delete', '--conn_id=new5']))
+ cli.connections(self.parser.parse_args(
+ ['connections', '--delete', '--conn_id=new6']))
stdout = mock_stdout.getvalue()
# Check deletion stdout
@@ -1258,11 +1284,14 @@ class CliTests(unittest.TestCase):
"\tSuccessfully deleted `conn_id`=new1",
"\tSuccessfully deleted `conn_id`=new2",
"\tSuccessfully deleted `conn_id`=new3",
- "\tSuccessfully deleted `conn_id`=new4"
+ "\tSuccessfully deleted `conn_id`=new4",
+ "\tSuccessfully deleted `conn_id`=new5",
+ "\tSuccessfully deleted `conn_id`=new6"
])
# Check deletions
- for conn_id in ['new1', 'new2', 'new3', 'new4']:
+ for index in range(1, 7):
+ conn_id = 'new%s' % index
result = (session
.query(models.Connection)
.filter(models.Connection.conn_id == conn_id)
@@ -1288,14 +1317,14 @@ class CliTests(unittest.TestCase):
new_callable=six.StringIO) as mock_stdout:
cli.connections(self.parser.parse_args(
['connections', '--delete', '--conn_id=fake',
- '--conn_uri=%s' % uri]))
+ '--conn_uri=%s' % uri, '--conn_type=fake-type']))
stdout = mock_stdout.getvalue()
# Check deletion attempt stdout
lines = [l for l in stdout.split('\n') if len(l) > 0]
self.assertListEqual(lines, [
("\tThe following args are not compatible with the " +
- "--delete flag: ['conn_uri']"),
+ "--delete flag: ['conn_uri', 'conn_type']"),
])
session.close()