You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/03/28 20:20:55 UTC
[incubator-ponymail-foal] 03/04: Use our own db class for async
scan/scroll instead of ES' which breaks type tests
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
commit 716ae18bcb3dfcbdb1982d1039a81a5bed99e044
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Sun Mar 28 22:18:27 2021 +0200
Use our own db class for async scan/scroll instead of ES' which breaks type tests
---
server/plugins/background.py | 42 ++++++++++++------------------------------
1 file changed, 12 insertions(+), 30 deletions(-)
diff --git a/server/plugins/background.py b/server/plugins/background.py
index 0f2afac..1d8c777 100644
--- a/server/plugins/background.py
+++ b/server/plugins/background.py
@@ -5,7 +5,6 @@ import sys
import time
from elasticsearch import AsyncElasticsearch
-from elasticsearch.helpers import async_scan
from elasticsearch_dsl import Search
import plugins.configuration
@@ -32,22 +31,6 @@ class ProgTimer:
print("Done in %.2f seconds" % (time.time() - self.start))
-def es_connector(database: plugins.configuration.DBConfig) -> AsyncElasticsearch:
- if database.dburl:
- return AsyncElasticsearch([database.dburl])
- else:
- return AsyncElasticsearch(
- [
- {
- "host": database.hostname,
- "port": database.port,
- "url_prefix": database.url_prefix or "",
- "use_ssl": database.secure,
- },
- ]
- )
-
-
async def get_lists(database: plugins.configuration.DBConfig) -> dict:
"""
@@ -56,15 +39,15 @@ async def get_lists(database: plugins.configuration.DBConfig) -> dict:
public or private
"""
lists = {}
- client = es_connector(database)
+ db = plugins.database.Database(database)
# Fetch aggregations of all public emails
- s = Search(using=client, index=database.db_prefix + "-mbox").query(
+ s = Search(using=db.client, index=database.db_prefix + "-mbox").query(
"match", private=False
)
s.aggs.bucket("per_list", "terms", field="list_raw")
- res = await client.search(
+ res = await db.search(
index=database.db_prefix + "-mbox", body=s.to_dict(), size=0
)
@@ -76,12 +59,12 @@ async def get_lists(database: plugins.configuration.DBConfig) -> dict:
}
# Ditto, for private emails
- s = Search(using=client, index=database.db_prefix + "-mbox").query(
+ s = Search(using=db.client, index=database.db_prefix + "-mbox").query(
"match", private=True
)
s.aggs.bucket("per_list", "terms", field="list_raw")
- res = await client.search(
+ res = await db.search(
index=database.db_prefix + "-mbox", body=s.to_dict(), size=0
)
@@ -91,7 +74,7 @@ async def get_lists(database: plugins.configuration.DBConfig) -> dict:
"count": ml["doc_count"],
"private": True,
}
- await client.close()
+ await db.client.close()
return lists
@@ -102,11 +85,11 @@ async def get_public_activity(database: plugins.configuration.DBConfig) -> dict:
:param database: a PyPony database configuration
:return: A dictionary with activity stats
"""
- client = es_connector(database)
+ db = plugins.database.Database(database)
# Fetch aggregations of all public emails
s = (
- Search(using=client, index=database.db_prefix + "-mbox")
+ Search(using=db, index=database.db_prefix + "-mbox")
.query("match", private=False)
.filter("range", date={"lt": "now+1d", "gt": "now-14d"})
)
@@ -117,7 +100,7 @@ async def get_public_activity(database: plugins.configuration.DBConfig) -> dict:
"daily_emails", "date_histogram", field="date", calendar_interval="1d"
)
- res = await client.search(
+ res = await db.search(
index=database.db_prefix + "-mbox", body=s.to_dict(), size=0
)
@@ -134,13 +117,12 @@ async def get_public_activity(database: plugins.configuration.DBConfig) -> dict:
thread_count = 0
s = (
- Search(using=client, index=database.db_prefix + "-mbox")
+ Search(using=db.client, index=database.db_prefix + "-mbox")
.query("match", private=False)
.filter("range", date={"lt": "now+1d", "gt": "now-14d"})
)
- async for doc in async_scan(
+ async for doc in db.scan(
index=database.db_prefix + "-mbox",
- client=client,
query=s.to_dict(),
_source_includes=[
"message-id",
@@ -175,7 +157,7 @@ async def get_public_activity(database: plugins.configuration.DBConfig) -> dict:
seen_topics.append(subject)
thread_count += 1
- await client.close()
+ await db.client.close()
activity = {
"hits": no_emails,