You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2020/08/17 11:51:03 UTC
[incubator-ponymail-foal] 02/02: Start using plugins.elastic for
archiver
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
commit 6b9bdafd1fdab156d979d4db7cfcbe7c26f19dcc
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Aug 17 13:50:15 2020 +0200
Start using plugins.elastic for archiver
This needs further work with the notifications stuff and the db names.
---
tools/archiver.py | 83 +++++++++++++++++--------------------------------------
1 file changed, 26 insertions(+), 57 deletions(-)
diff --git a/tools/archiver.py b/tools/archiver.py
index 19702c8..ca35e9c 100755
--- a/tools/archiver.py
+++ b/tools/archiver.py
@@ -56,12 +56,13 @@ import uuid
import certifi
import chardet
-import elasticsearch
import formatflowed
import netaddr
import yaml
import plugins.generators
+import plugins.elastic
+import elasticsearch
# Fetch config from same dir as archiver.py
config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "ponymail.yaml")
@@ -73,12 +74,7 @@ archiver_generator = config["archiver"].get(
) # Fall back to full hashing if nothing is set.
logger = None
ES_MAJOR = elasticsearch.VERSION[0]
-auth = None
-if config["elasticsearch"].get("user"):
- auth = (
- config["elasticsearch"].get("user"),
- config["elasticsearch"].get("password"),
- )
+
# If MailMan is enabled, import and set it up
if config.get("mailman") and config["mailman"].get("plugin"):
@@ -210,68 +206,41 @@ class Archiver(object): # N.B. Also used by import-mbox.py
if kwargs.pop("consistency", None): # drop the key if present
if self.wait_for_active_shards: # replace with wait if defined
kwargs["wait_for_active_shards"] = self.wait_for_active_shards
- return self.es.index(**kwargs)
+ return self.elastic.index(**kwargs)
def __init__(self, generator=archiver_generator, parse_html=False, dump_dir=None):
""" Just initialize ES. """
self.html = parse_html
self.generator = generator
+ if dump_dir:
+ try:
+ self.elastic = plugins.elastic.Elastic()
+ except elasticsearch.exceptions.ElasticsearchException as e:
+ print(e)
+ print(
+ "ES connection failed, but dumponfail specified, dumping to %s"
+ % dump_dir
+ )
+ else:
+ self.elastic = plugins.elastic.Elastic()
+
if parse_html:
import html2text
self.html2text = html2text.html2text
self.dbname = config["elasticsearch"].get("dbname", "ponymail")
- ssl = config["elasticsearch"].get("ssl", False)
+
# Always allow this to be set; will be replaced as necessary by wait_for_active_shards
self.consistency = config["elasticsearch"].get("write", "quorum")
- if ES_MAJOR == 2:
+ es_engine_major = self.elastic.engineMajor()
+ if es_engine_major == 2:
pass
- elif ES_MAJOR in [5, 6, 7]:
+ elif es_engine_major in [5, 6, 7]:
self.wait_for_active_shards = config["elasticsearch"].get("wait", 1)
else:
- raise Exception("Unexpected elasticsearch version ", elasticsearch.VERSION)
+ raise Exception("Unexpected elasticsearch version ", es_engine_major)
self.cropout = config.get("debug", {}).get("cropout")
- uri = config["elasticsearch"].get("uri", "")
-
- dbs = [
- {
- "host": config["elasticsearch"]["hostname"],
- "port": config["elasticsearch"]["port"],
- "use_ssl": ssl,
- "url_prefix": uri,
- "http_auth": auth,
- "ca_certs": certifi.where(),
- }
- ]
- # Backup ES?
- backup = config["elasticsearch"].get("backup")
- if backup:
- dbs.append(
- {
- "host": backup,
- "port": config["elasticsearch"]["port"],
- "use_ssl": ssl,
- "url_prefix": uri,
- "http_auth": auth,
- "ca_certs": certifi.where(),
- }
- )
- # If we have a dump dir, we can risk failing the connection.
- if dump_dir:
- try:
- self.es = elasticsearch.Elasticsearch(
- dbs, max_retries=5, retry_on_timeout=True
- )
- except elasticsearch.exceptions.ElasticsearchException as e:
- print(e)
- print(
- "ES connection failed, but dumponfail specified, dumping to %s"
- % dump_dir
- )
- else:
- self.es = elasticsearch.Elasticsearch(
- dbs, max_retries=5, retry_on_timeout=True
- )
+
def message_body(self, msg: email.message.Message, verbose=False, ignore_body=None):
body = None
@@ -610,8 +579,8 @@ class Archiver(object): # N.B. Also used by import-mbox.py
if dm:
cid = dm.group(1)
mid = dm.group(2)
- if self.es.exists(index=self.dbname, doc_type="account", id=cid):
- doc = self.es.get(index=self.dbname, doc_type="account", id=cid)
+ if self.elastic.exists(index=self.dbname, doc_type="account", id=cid):
+ doc = self.elastic.get(index=self.dbname, doc_type="account", id=cid)
if doc:
oldrefs.append(cid)
# N.B. no index is supplied, so ES will generate one
@@ -644,8 +613,8 @@ class Archiver(object): # N.B. Also used by import-mbox.py
):
cid = im.group(1)
mid = im.group(2)
- if self.es.exists(index=self.dbname, doc_type="account", id=cid):
- doc = self.es.get(index=self.dbname, doc_type="account", id=cid)
+ if self.elastic.exists(index=self.dbname, id=cid):
+ doc = self.elastic.get(index=self.dbname, id=cid)
# does the user want to be notified of indirect replies?
if (