You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2020/08/17 11:51:03 UTC

[incubator-ponymail-foal] 02/02: Start using plugins.elastic for archiver

This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git

commit 6b9bdafd1fdab156d979d4db7cfcbe7c26f19dcc
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Aug 17 13:50:15 2020 +0200

    Start using plugins.elastic for archiver
    
    This needs further work with the notifications stuff and the db names.
---
 tools/archiver.py | 83 +++++++++++++++++--------------------------------------
 1 file changed, 26 insertions(+), 57 deletions(-)

diff --git a/tools/archiver.py b/tools/archiver.py
index 19702c8..ca35e9c 100755
--- a/tools/archiver.py
+++ b/tools/archiver.py
@@ -56,12 +56,13 @@ import uuid
 
 import certifi
 import chardet
-import elasticsearch
 import formatflowed
 import netaddr
 import yaml
 
 import plugins.generators
+import plugins.elastic
+import elasticsearch
 
 # Fetch config from same dir as archiver.py
 config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "ponymail.yaml")
@@ -73,12 +74,7 @@ archiver_generator = config["archiver"].get(
 )  # Fall back to full hashing if nothing is set.
 logger = None
 ES_MAJOR = elasticsearch.VERSION[0]
-auth = None
-if config["elasticsearch"].get("user"):
-    auth = (
-        config["elasticsearch"].get("user"),
-        config["elasticsearch"].get("password"),
-    )
+
 
 # If MailMan is enabled, import and set it up
 if config.get("mailman") and config["mailman"].get("plugin"):
@@ -210,68 +206,41 @@ class Archiver(object):  # N.B. Also used by import-mbox.py
             if kwargs.pop("consistency", None):  # drop the key if present
                 if self.wait_for_active_shards:  # replace with wait if defined
                     kwargs["wait_for_active_shards"] = self.wait_for_active_shards
-        return self.es.index(**kwargs)
+        return self.elastic.index(**kwargs)
 
     def __init__(self, generator=archiver_generator, parse_html=False, dump_dir=None):
         """ Just initialize ES. """
         self.html = parse_html
         self.generator = generator
+        if dump_dir:
+            try:
+                self.elastic = plugins.elastic.Elastic()
+            except elasticsearch.exceptions.ElasticsearchException as e:
+                print(e)
+                print(
+                    "ES connection failed, but dumponfail specified, dumping to %s"
+                    % dump_dir
+                )
+        else:
+            self.elastic = plugins.elastic.Elastic()
+
         if parse_html:
             import html2text
 
             self.html2text = html2text.html2text
         self.dbname = config["elasticsearch"].get("dbname", "ponymail")
-        ssl = config["elasticsearch"].get("ssl", False)
+
         # Always allow this to be set; will be replaced as necessary by wait_for_active_shards
         self.consistency = config["elasticsearch"].get("write", "quorum")
-        if ES_MAJOR == 2:
+        es_engine_major = self.elastic.engineMajor()
+        if es_engine_major == 2:
             pass
-        elif ES_MAJOR in [5, 6, 7]:
+        elif es_engine_major in [5, 6, 7]:
             self.wait_for_active_shards = config["elasticsearch"].get("wait", 1)
         else:
-            raise Exception("Unexpected elasticsearch version ", elasticsearch.VERSION)
+            raise Exception("Unexpected elasticsearch version ", es_engine_major)
         self.cropout = config.get("debug", {}).get("cropout")
-        uri = config["elasticsearch"].get("uri", "")
-
-        dbs = [
-            {
-                "host": config["elasticsearch"]["hostname"],
-                "port": config["elasticsearch"]["port"],
-                "use_ssl": ssl,
-                "url_prefix": uri,
-                "http_auth": auth,
-                "ca_certs": certifi.where(),
-            }
-        ]
-        # Backup ES?
-        backup = config["elasticsearch"].get("backup")
-        if backup:
-            dbs.append(
-                {
-                    "host": backup,
-                    "port": config["elasticsearch"]["port"],
-                    "use_ssl": ssl,
-                    "url_prefix": uri,
-                    "http_auth": auth,
-                    "ca_certs": certifi.where(),
-                }
-            )
-        # If we have a dump dir, we can risk failing the connection.
-        if dump_dir:
-            try:
-                self.es = elasticsearch.Elasticsearch(
-                    dbs, max_retries=5, retry_on_timeout=True
-                )
-            except elasticsearch.exceptions.ElasticsearchException as e:
-                print(e)
-                print(
-                    "ES connection failed, but dumponfail specified, dumping to %s"
-                    % dump_dir
-                )
-        else:
-            self.es = elasticsearch.Elasticsearch(
-                dbs, max_retries=5, retry_on_timeout=True
-            )
+
 
     def message_body(self, msg: email.message.Message, verbose=False, ignore_body=None):
         body = None
@@ -610,8 +579,8 @@ class Archiver(object):  # N.B. Also used by import-mbox.py
             if dm:
                 cid = dm.group(1)
                 mid = dm.group(2)
-                if self.es.exists(index=self.dbname, doc_type="account", id=cid):
-                    doc = self.es.get(index=self.dbname, doc_type="account", id=cid)
+                if self.elastic.exists(index=self.dbname, doc_type="account", id=cid):
+                    doc = self.elastic.get(index=self.dbname, doc_type="account", id=cid)
                     if doc:
                         oldrefs.append(cid)
                         # N.B. no index is supplied, so ES will generate one
@@ -644,8 +613,8 @@ class Archiver(object):  # N.B. Also used by import-mbox.py
             ):
                 cid = im.group(1)
                 mid = im.group(2)
-                if self.es.exists(index=self.dbname, doc_type="account", id=cid):
-                    doc = self.es.get(index=self.dbname, doc_type="account", id=cid)
+                if self.elastic.exists(index=self.dbname, id=cid):
+                    doc = self.elastic.get(index=self.dbname, id=cid)
 
                     # does the user want to be notified of indirect replies?
                     if (