You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@superset.apache.org by gi...@git.apache.org on 2017/09/26 06:03:18 UTC

[GitHub] xrmx commented on a change in pull request #3527: Druid refresh metadata performance improvements

xrmx commented on a change in pull request #3527: Druid refresh metadata performance improvements
URL: https://github.com/apache/incubator-superset/pull/3527#discussion_r140963729
 
 

 ##########
 File path: superset/connectors/druid/models.py
 ##########
 @@ -101,15 +108,99 @@ def get_druid_version(self):
         ).format(obj=self)
         return json.loads(requests.get(endpoint).text)['version']
 
-    def refresh_datasources(self, datasource_name=None, merge_flag=False):
+    def refresh_datasources(
+            self,
+            datasource_name=None,
+            merge_flag=True,
+            refreshAll=True):
         """Refresh metadata of all datasources in the cluster
         If ``datasource_name`` is specified, only that datasource is updated
         """
         self.druid_version = self.get_druid_version()
-        for datasource in self.get_datasources():
-            if datasource not in conf.get('DRUID_DATA_SOURCE_BLACKLIST', []):
-                if not datasource_name or datasource_name == datasource:
-                    DruidDatasource.sync_to_db(datasource, self, merge_flag)
+        ds_list = self.get_datasources()
+        blacklist = conf.get('DRUID_DATA_SOURCE_BLACKLIST', [])
+        ds_refresh = []
+        if not datasource_name:
+            ds_refresh = list(filter(lambda ds: ds not in blacklist, ds_list))
+        elif datasource_name not in blacklist and datasource_name in ds_list:
+            ds_refresh.append(datasource_name)
+        else:
+            return
+        self.refresh_async(ds_refresh, merge_flag, refreshAll)
+
+    def refresh_async(self, datasource_names, merge_flag, refreshAll):
+        """
+        Fetches metadata for the specified datasources andm
+        merges to the Superset database
+        """
+        session = db.session
+        ds_list = (
+            session.query(DruidDatasource)
+            .filter(or_(DruidDatasource.datasource_name == name
+                    for name in datasource_names))
+        )
+
+        ds_map = {ds.name: ds for ds in ds_list}
+        for ds_name in datasource_names:
+            datasource = ds_map.get(ds_name, None)
+            if not datasource:
+                datasource = DruidDatasource(datasource_name=ds_name)
+                with session.no_autoflush:
+                    session.add(datasource)
+                flasher(
+                    "Adding new datasource [{}]".format(ds_name), 'success')
+                ds_map[ds_name] = datasource
+            elif refreshAll:
+                flasher(
+                    "Refreshing datasource [{}]".format(ds_name), 'info')
+            else:
+                del ds_map[ds_name]
+                continue
+            datasource.cluster = self
+            datasource.merge_flag = merge_flag
+        session.flush()
+
+        # Prepare multithreaded executation
 
 Review comment:
   nits: these are processes not threads. Q: Do you think this can be offloaded to celery instead of multiprocessing?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services