You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by yj...@apache.org on 2020/10/13 20:30:26 UTC
[incubator-superset] branch master updated: perf: speed up uuid
column generation (#11209)
This is an automated email from the ASF dual-hosted git repository.
yjc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git
The following commit(s) were added to refs/heads/master by this push:
new d7eb1d4 perf: speed up uuid column generation (#11209)
d7eb1d4 is described below
commit d7eb1d476f6e2ffbc500c613e319b1e49a456a5a
Author: Jesse Yang <je...@airbnb.com>
AuthorDate: Tue Oct 13 13:29:49 2020 -0700
perf: speed up uuid column generation (#11209)
---
...b56500de1855_add_uuid_column_to_import_mixin.py | 142 +++++++++++++++------
1 file changed, 101 insertions(+), 41 deletions(-)
diff --git a/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py b/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py
index 19903b9..bffa983 100644
--- a/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py
+++ b/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py
@@ -22,12 +22,18 @@ Create Date: 2020-09-28 17:57:23.128142
"""
import json
-import logging
-import uuid
+import os
+import time
+from json.decoder import JSONDecodeError
+from uuid import uuid4
import sqlalchemy as sa
from alembic import op
+from sqlalchemy.dialects.mysql.base import MySQLDialect
+from sqlalchemy.dialects.postgresql.base import PGDialect
+from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import load_only
from sqlalchemy_utils import UUIDType
from superset import db
@@ -43,7 +49,7 @@ Base = declarative_base()
class ImportMixin:
id = sa.Column(sa.Integer, primary_key=True)
- uuid = sa.Column(UUIDType(binary=True), primary_key=False, default=uuid.uuid4)
+ uuid = sa.Column(UUIDType(binary=True), primary_key=False, default=uuid4)
table_names = [
@@ -71,26 +77,56 @@ models = {
models["dashboards"].position_json = sa.Column(utils.MediumText())
+default_batch_size = int(os.environ.get("BATCH_SIZE", 200))
-def add_uuids(objects, session, batch_size=100):
- uuid_map = {}
- count = len(objects)
- for i, object_ in enumerate(objects):
- object_.uuid = uuid.uuid4()
- uuid_map[object_.id] = object_.uuid
- session.merge(object_)
- if (i + 1) % batch_size == 0:
- session.commit()
- print(f"uuid assigned to {i + 1} out of {count}")
+# Add uuids directly using built-in SQL uuid function
+add_uuids_by_dialect = {
+ MySQLDialect: """UPDATE %s SET uuid = UNHEX(REPLACE(uuid(), "-", ""));""",
+ PGDialect: """UPDATE %s SET uuid = uuid_in(md5(random()::text || clock_timestamp()::text)::cstring);""",
+}
- session.commit()
- print(f"Done! Assigned {count} uuids")
- return uuid_map
+def add_uuids(table_name, session, batch_size=default_batch_size):
+ """Populate columns with pre-computed uuids"""
+ bind = op.get_bind()
+ objects_query = session.query(models[table_name])
+ count = objects_query.count()
+
+ # silently skip if the table is empty (suitable for db initialization)
+ if count == 0:
+ return
+
+ print(f"\nAdding uuids for `{table_name}`...")
+ start_time = time.time()
+
+ # Use dialect specific native SQL queries if possible
+ for dialect, sql in add_uuids_by_dialect.items():
+ if isinstance(bind.dialect, dialect):
+ op.execute(sql % table_name)
+ print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.")
+ return
+
+ # Othwewise Use Python uuid function
+ start = 0
+ while start < count:
+ end = min(start + batch_size, count)
+ for obj, uuid in map(lambda obj: (obj, uuid4()), objects_query[start:end]):
+ obj.uuid = uuid
+ session.merge(obj)
+ session.commit()
+ if start + batch_size < count:
+ print(f" uuid assigned to {end} out of {count}\r", end="")
+ start += batch_size
+
+ print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.")
def update_position_json(dashboard, session, uuid_map):
- layout = json.loads(dashboard.position_json or "{}")
+ try:
+ layout = json.loads(dashboard.position_json or "{}")
+ except JSONDecodeError:
+ layout = {}
+
for object_ in layout.values():
if (
isinstance(object_, dict)
@@ -105,37 +141,63 @@ def update_position_json(dashboard, session, uuid_map):
dashboard.position_json = json.dumps(layout, indent=4)
session.merge(dashboard)
+
+
+def update_dashboards(session, uuid_map):
+ message = (
+ "Updating dasboard position json with slice uuid.."
+ if uuid_map
+ else "Cleaning up slice uuid from dashboard position json.."
+ )
+ print(f"\n{message}\r", end="")
+
+ query = session.query(models["dashboards"])
+ dashboard_count = query.count()
+ for i, dashboard in enumerate(query.all()):
+ update_position_json(dashboard, session, uuid_map)
+ if i and i % default_batch_size == 0:
+ session.commit()
+ print(f"{message} {i+1}/{dashboard_count}\r", end="")
+
session.commit()
+ # Extra whitespace to override very long numbers, e.g. 99999/99999.
+ print(f"{message} Done. \n")
def upgrade():
bind = op.get_bind()
session = db.Session(bind=bind)
- uuid_maps = {}
- for table_name, model in models.items():
- with op.batch_alter_table(table_name) as batch_op:
- batch_op.add_column(
- sa.Column(
- "uuid",
- UUIDType(binary=True),
- primary_key=False,
- default=uuid.uuid4,
+ for table_name in models.keys():
+ try:
+ with op.batch_alter_table(table_name) as batch_op:
+ batch_op.add_column(
+ sa.Column(
+ "uuid", UUIDType(binary=True), primary_key=False, default=uuid4,
+ ),
)
- )
+ except OperationalError:
+ # ignore collumn update errors so that we can run upgrade multiple times
+ pass
- # populate column
- objects = session.query(model).all()
- uuid_maps[table_name] = add_uuids(objects, session)
+ add_uuids(table_name, session)
- # add uniqueness constraint
- with op.batch_alter_table(table_name) as batch_op:
- batch_op.create_unique_constraint(f"uq_{table_name}_uuid", ["uuid"])
+ try:
+ # add uniqueness constraint
+ with op.batch_alter_table(table_name) as batch_op:
+ # batch mode is required for sqllite
+ batch_op.create_unique_constraint(f"uq_{table_name}_uuid", ["uuid"])
+ except OperationalError:
+ pass
# add UUID to Dashboard.position_json
- Dashboard = models["dashboards"]
- for dashboard in session.query(Dashboard).all():
- update_position_json(dashboard, session, uuid_maps["slices"])
+ slice_uuid_map = {
+ slc.id: slc.uuid
+ for slc in session.query(models["slices"])
+ .options(load_only("id", "uuid"))
+ .all()
+ }
+ update_dashboards(session, slice_uuid_map)
def downgrade():
@@ -143,12 +205,10 @@ def downgrade():
session = db.Session(bind=bind)
# remove uuid from position_json
- Dashboard = models["dashboards"]
- for dashboard in session.query(Dashboard).all():
- update_position_json(dashboard, session, {})
+ update_dashboards(session, {})
# remove uuid column
for table_name, model in models.items():
- with op.batch_alter_table(model) as batch_op:
- batch_op.drop_constraint(f"uq_{table_name}_uuid")
+ with op.batch_alter_table(table_name) as batch_op:
+ batch_op.drop_constraint(f"uq_{table_name}_uuid", type_="unique")
batch_op.drop_column("uuid")