You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by jo...@apache.org on 2022/10/13 00:12:11 UTC

[superset] branch master updated: fix(migration): Ensure the paginated update is deterministic (#21778)

This is an automated email from the ASF dual-hosted git repository.

johnbodley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 88a89c9fd6 fix(migration): Ensure the paginated update is deterministic (#21778)
88a89c9fd6 is described below

commit 88a89c9fd683b50d8a81754199fba6dbb4c7bef3
Author: John Bodley <45...@users.noreply.github.com>
AuthorDate: Wed Oct 12 17:11:59 2022 -0700

    fix(migration): Ensure the paginated update is deterministic (#21778)
---
 superset/migrations/shared/utils.py                | 31 ++++++++++++++--------
 ...59_7fb8bca906d2_permalink_rename_filterstate.py |  3 ---
 2 files changed, 20 insertions(+), 14 deletions(-)

diff --git a/superset/migrations/shared/utils.py b/superset/migrations/shared/utils.py
index 14987ea0b4..e05b1d357f 100644
--- a/superset/migrations/shared/utils.py
+++ b/superset/migrations/shared/utils.py
@@ -100,22 +100,31 @@ def paginated_update(
     """
     Update models in small batches so we don't have to load everything in memory.
     """
-    start = 0
-    count = query.count()
+
+    total = query.count()
+    processed = 0
     session: Session = inspect(query).session
+    result = session.execute(query)
+
     if print_page_progress is None or print_page_progress is True:
-        print_page_progress = lambda current, total: print(
-            f"    {current}/{total}", end="\r"
+        print_page_progress = lambda processed, total: print(
+            f"    {processed}/{total}", end="\r"
         )
-    while start < count:
-        end = min(start + batch_size, count)
-        for obj in query[start:end]:
-            yield obj
-            session.merge(obj)
+
+    while True:
+        rows = result.fetchmany(batch_size)
+
+        if not rows:
+            break
+
+        for row in rows:
+            yield row[0]
+
         session.commit()
+        processed += len(rows)
+
         if print_page_progress:
-            print_page_progress(end, count)
-        start += batch_size
+            print_page_progress(processed, total)
 
 
 def try_load_json(data: Optional[str]) -> Dict[str, Any]:
diff --git a/superset/migrations/versions/2022-06-27_14-59_7fb8bca906d2_permalink_rename_filterstate.py b/superset/migrations/versions/2022-06-27_14-59_7fb8bca906d2_permalink_rename_filterstate.py
index ecd424d12a..0b76404dc9 100644
--- a/superset/migrations/versions/2022-06-27_14-59_7fb8bca906d2_permalink_rename_filterstate.py
+++ b/superset/migrations/versions/2022-06-27_14-59_7fb8bca906d2_permalink_rename_filterstate.py
@@ -66,7 +66,6 @@ def upgrade():
                 state["anchor"] = state["hash"]
                 del state["hash"]
             entry.value = pickle.dumps(value)
-    session.commit()
 
 
 def downgrade():
@@ -87,5 +86,3 @@ def downgrade():
                 state["hash"] = state["anchor"]
                 del state["anchor"]
             entry.value = pickle.dumps(value)
-        session.merge(entry)
-    session.commit()