You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by yj...@apache.org on 2020/10/13 20:30:26 UTC

[incubator-superset] branch master updated: perf: speed up uuid column generation (#11209)

This is an automated email from the ASF dual-hosted git repository.

yjc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new d7eb1d4  perf:  speed up uuid column generation (#11209)
d7eb1d4 is described below

commit d7eb1d476f6e2ffbc500c613e319b1e49a456a5a
Author: Jesse Yang <je...@airbnb.com>
AuthorDate: Tue Oct 13 13:29:49 2020 -0700

    perf:  speed up uuid column generation (#11209)
---
 ...b56500de1855_add_uuid_column_to_import_mixin.py | 142 +++++++++++++++------
 1 file changed, 101 insertions(+), 41 deletions(-)

diff --git a/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py b/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py
index 19903b9..bffa983 100644
--- a/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py
+++ b/superset/migrations/versions/b56500de1855_add_uuid_column_to_import_mixin.py
@@ -22,12 +22,18 @@ Create Date: 2020-09-28 17:57:23.128142
 
 """
 import json
-import logging
-import uuid
+import os
+import time
+from json.decoder import JSONDecodeError
+from uuid import uuid4
 
 import sqlalchemy as sa
 from alembic import op
+from sqlalchemy.dialects.mysql.base import MySQLDialect
+from sqlalchemy.dialects.postgresql.base import PGDialect
+from sqlalchemy.exc import OperationalError
 from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import load_only
 from sqlalchemy_utils import UUIDType
 
 from superset import db
@@ -43,7 +49,7 @@ Base = declarative_base()
 
 class ImportMixin:
     id = sa.Column(sa.Integer, primary_key=True)
-    uuid = sa.Column(UUIDType(binary=True), primary_key=False, default=uuid.uuid4)
+    uuid = sa.Column(UUIDType(binary=True), primary_key=False, default=uuid4)
 
 
 table_names = [
@@ -71,26 +77,56 @@ models = {
 
 models["dashboards"].position_json = sa.Column(utils.MediumText())
 
+default_batch_size = int(os.environ.get("BATCH_SIZE", 200))
 
-def add_uuids(objects, session, batch_size=100):
-    uuid_map = {}
-    count = len(objects)
-    for i, object_ in enumerate(objects):
-        object_.uuid = uuid.uuid4()
-        uuid_map[object_.id] = object_.uuid
-        session.merge(object_)
-        if (i + 1) % batch_size == 0:
-            session.commit()
-            print(f"uuid assigned to {i + 1} out of {count}")
+# Add uuids directly using built-in SQL uuid function
+add_uuids_by_dialect = {
+    MySQLDialect: """UPDATE %s SET uuid = UNHEX(REPLACE(uuid(), "-", ""));""",
+    PGDialect: """UPDATE %s SET uuid = uuid_in(md5(random()::text || clock_timestamp()::text)::cstring);""",
+}
 
-    session.commit()
-    print(f"Done! Assigned {count} uuids")
 
-    return uuid_map
+def add_uuids(table_name, session, batch_size=default_batch_size):
+    """Populate columns with pre-computed uuids"""
+    bind = op.get_bind()
+    objects_query = session.query(models[table_name])
+    count = objects_query.count()
+
+    # silently skip if the table is empty (suitable for db initialization)
+    if count == 0:
+        return
+
+    print(f"\nAdding uuids for `{table_name}`...")
+    start_time = time.time()
+
+    # Use dialect specific native SQL queries if possible
+    for dialect, sql in add_uuids_by_dialect.items():
+        if isinstance(bind.dialect, dialect):
+            op.execute(sql % table_name)
+            print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.")
+            return
+
+    # Othwewise Use Python uuid function
+    start = 0
+    while start < count:
+        end = min(start + batch_size, count)
+        for obj, uuid in map(lambda obj: (obj, uuid4()), objects_query[start:end]):
+            obj.uuid = uuid
+            session.merge(obj)
+        session.commit()
+        if start + batch_size < count:
+            print(f"  uuid assigned to {end} out of {count}\r", end="")
+        start += batch_size
+
+    print(f"Done. Assigned {count} uuids in {time.time() - start_time:.3f}s.")
 
 
 def update_position_json(dashboard, session, uuid_map):
-    layout = json.loads(dashboard.position_json or "{}")
+    try:
+        layout = json.loads(dashboard.position_json or "{}")
+    except JSONDecodeError:
+        layout = {}
+
     for object_ in layout.values():
         if (
             isinstance(object_, dict)
@@ -105,37 +141,63 @@ def update_position_json(dashboard, session, uuid_map):
 
     dashboard.position_json = json.dumps(layout, indent=4)
     session.merge(dashboard)
+
+
+def update_dashboards(session, uuid_map):
+    message = (
+        "Updating dasboard position json with slice uuid.."
+        if uuid_map
+        else "Cleaning up slice uuid from dashboard position json.."
+    )
+    print(f"\n{message}\r", end="")
+
+    query = session.query(models["dashboards"])
+    dashboard_count = query.count()
+    for i, dashboard in enumerate(query.all()):
+        update_position_json(dashboard, session, uuid_map)
+        if i and i % default_batch_size == 0:
+            session.commit()
+        print(f"{message} {i+1}/{dashboard_count}\r", end="")
+
     session.commit()
+    # Extra whitespace to override very long numbers, e.g. 99999/99999.
+    print(f"{message} Done.      \n")
 
 
 def upgrade():
     bind = op.get_bind()
     session = db.Session(bind=bind)
 
-    uuid_maps = {}
-    for table_name, model in models.items():
-        with op.batch_alter_table(table_name) as batch_op:
-            batch_op.add_column(
-                sa.Column(
-                    "uuid",
-                    UUIDType(binary=True),
-                    primary_key=False,
-                    default=uuid.uuid4,
+    for table_name in models.keys():
+        try:
+            with op.batch_alter_table(table_name) as batch_op:
+                batch_op.add_column(
+                    sa.Column(
+                        "uuid", UUIDType(binary=True), primary_key=False, default=uuid4,
+                    ),
                 )
-            )
+        except OperationalError:
+            # ignore collumn update errors so that we can run upgrade multiple times
+            pass
 
-        # populate column
-        objects = session.query(model).all()
-        uuid_maps[table_name] = add_uuids(objects, session)
+        add_uuids(table_name, session)
 
-        # add uniqueness constraint
-        with op.batch_alter_table(table_name) as batch_op:
-            batch_op.create_unique_constraint(f"uq_{table_name}_uuid", ["uuid"])
+        try:
+            # add uniqueness constraint
+            with op.batch_alter_table(table_name) as batch_op:
+                # batch mode is required for sqllite
+                batch_op.create_unique_constraint(f"uq_{table_name}_uuid", ["uuid"])
+        except OperationalError:
+            pass
 
     # add UUID to Dashboard.position_json
-    Dashboard = models["dashboards"]
-    for dashboard in session.query(Dashboard).all():
-        update_position_json(dashboard, session, uuid_maps["slices"])
+    slice_uuid_map = {
+        slc.id: slc.uuid
+        for slc in session.query(models["slices"])
+        .options(load_only("id", "uuid"))
+        .all()
+    }
+    update_dashboards(session, slice_uuid_map)
 
 
 def downgrade():
@@ -143,12 +205,10 @@ def downgrade():
     session = db.Session(bind=bind)
 
     # remove uuid from position_json
-    Dashboard = models["dashboards"]
-    for dashboard in session.query(Dashboard).all():
-        update_position_json(dashboard, session, {})
+    update_dashboards(session, {})
 
     # remove uuid column
     for table_name, model in models.items():
-        with op.batch_alter_table(model) as batch_op:
-            batch_op.drop_constraint(f"uq_{table_name}_uuid")
+        with op.batch_alter_table(table_name) as batch_op:
+            batch_op.drop_constraint(f"uq_{table_name}_uuid", type_="unique")
             batch_op.drop_column("uuid")