You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@superset.apache.org by ro...@apache.org on 2021/02/26 16:06:37 UTC

[superset] branch master updated: fix(celery): Reset DB connection pools for forked worker processes (#13350)

This is an automated email from the ASF dual-hosted git repository.

robdiciuccio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new b4ca39c  fix(celery): Reset DB connection pools for forked worker processes (#13350)
b4ca39c is described below

commit b4ca39ceeba557d4a8dfaa219e62389716cf76e8
Author: Rob DiCiuccio <ro...@gmail.com>
AuthorDate: Fri Feb 26 08:05:40 2021 -0800

    fix(celery): Reset DB connection pools for forked worker processes (#13350)
    
    * Reset sqlalchemy connection pool on celery process fork
    
    * Fix race condition with async chart loading state
    
    * pylint: ignore
    
    * prettier
---
 superset-frontend/src/dashboard/index.jsx      |  4 +++-
 superset-frontend/src/explore/index.jsx        |  4 +++-
 superset-frontend/src/middleware/asyncEvent.ts |  6 ++++--
 superset/tasks/celery_app.py                   | 14 ++++++++++++--
 4 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/superset-frontend/src/dashboard/index.jsx b/superset-frontend/src/dashboard/index.jsx
index 9fe8234..78ba1d2 100644
--- a/superset-frontend/src/dashboard/index.jsx
+++ b/superset-frontend/src/dashboard/index.jsx
@@ -38,7 +38,9 @@ const initState = getInitialState(bootstrapData);
 const asyncEventMiddleware = initAsyncEvents({
   config: bootstrapData.common.conf,
   getPendingComponents: ({ charts }) =>
-    Object.values(charts).filter(c => c.chartStatus === 'loading'),
+    Object.values(charts).filter(
+      c => c.chartStatus === 'loading' && c.asyncJobId !== undefined,
+    ),
   successAction: (componentId, componentData) =>
     actions.chartUpdateSucceeded(componentData, componentId),
   errorAction: (componentId, response) =>
diff --git a/superset-frontend/src/explore/index.jsx b/superset-frontend/src/explore/index.jsx
index 83e4bc6..7698dee 100644
--- a/superset-frontend/src/explore/index.jsx
+++ b/superset-frontend/src/explore/index.jsx
@@ -40,7 +40,9 @@ const initState = getInitialState(bootstrapData);
 const asyncEventMiddleware = initAsyncEvents({
   config: bootstrapData.common.conf,
   getPendingComponents: ({ charts }) =>
-    Object.values(charts).filter(c => c.chartStatus === 'loading'),
+    Object.values(charts).filter(
+      c => c.chartStatus === 'loading' && c.asyncJobId !== undefined,
+    ),
   successAction: (componentId, componentData) =>
     actions.chartUpdateSucceeded(componentData, componentId),
   errorAction: (componentId, response) =>
diff --git a/superset-frontend/src/middleware/asyncEvent.ts b/superset-frontend/src/middleware/asyncEvent.ts
index 20caad9..1beb997 100644
--- a/superset-frontend/src/middleware/asyncEvent.ts
+++ b/superset-frontend/src/middleware/asyncEvent.ts
@@ -119,8 +119,7 @@ const initAsyncEvents = (options: AsyncEventOptions) => {
     };
 
     const processEvents = async () => {
-      const state = store.getState();
-      const queuedComponents = getPendingComponents(state);
+      let queuedComponents = getPendingComponents(store.getState());
       const eventArgs = lastReceivedEventId
         ? { last_id: lastReceivedEventId }
         : {};
@@ -128,6 +127,9 @@ const initAsyncEvents = (options: AsyncEventOptions) => {
       if (queuedComponents && queuedComponents.length) {
         try {
           const { result: events } = await fetchEvents(eventArgs);
+          // refetch queuedComponents due to race condition where results are available
+          // before component state is updated with asyncJobId
+          queuedComponents = getPendingComponents(store.getState());
           if (events && events.length) {
             const componentsByJobId = queuedComponents.reduce((acc, item) => {
               acc[item.asyncJobId] = item;
diff --git a/superset/tasks/celery_app.py b/superset/tasks/celery_app.py
index d84273f..f8b9bef 100644
--- a/superset/tasks/celery_app.py
+++ b/superset/tasks/celery_app.py
@@ -19,13 +19,16 @@
 This is the main entrypoint used by Celery workers. As such,
 it needs to call create_app() in order to initialize things properly
 """
+from typing import Any
+
+from celery.signals import worker_process_init
 
 # Superset framework imports
 from superset import create_app
-from superset.extensions import celery_app
+from superset.extensions import celery_app, db
 
 # Init the Flask app / configure everything
-create_app()
+flask_app = create_app()
 
 # Need to import late, as the celery_app will have been setup by "create_app()"
 # pylint: disable=wrong-import-position, unused-import
@@ -33,3 +36,10 @@ from . import cache, schedules, scheduler  # isort:skip
 
 # Export the celery app globally for Celery (as run on the cmd line) to find
 app = celery_app
+
+
+@worker_process_init.connect
+def reset_db_connection_pool(**kwargs: Any) -> None:  # pylint: disable=unused-argument
+    with flask_app.app_context():
+        # https://docs.sqlalchemy.org/en/14/core/connections.html#engine-disposal
+        db.engine.dispose()