You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2020/08/17 12:07:14 UTC

[incubator-ponymail-foal] branch master updated: Move elastic options into elastic.py, move archiver's dependency on ES into archive_message

This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git


The following commit(s) were added to refs/heads/master by this push:
     new ce0da91  Move elastic options into elastic.py, move archiver's dependency on ES into archive_message
ce0da91 is described below

commit ce0da913de05c41908f7b4a9c6ce5d20b371e4b4
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Aug 17 14:07:01 2020 +0200

    Move elastic options into elastic.py, move archiver's dependency on ES into archive_message
    
    alos fix a typo
---
 tools/archiver.py        | 52 ++++++++++++++++++++++++------------------------
 tools/plugins/elastic.py | 20 +++++++++++++++----
 2 files changed, 42 insertions(+), 30 deletions(-)

diff --git a/tools/archiver.py b/tools/archiver.py
index b2b433e..8bc4f17 100755
--- a/tools/archiver.py
+++ b/tools/archiver.py
@@ -212,34 +212,11 @@ class Archiver(object):  # N.B. Also used by import-mbox.py
         """ Just initialize ES. """
         self.html = parse_html
         self.generator = generator
-        if dump_dir:
-            try:
-                self.elastic = plugins.elastic.Elastic()
-            except elasticsearch.exceptions.ElasticsearchException as e:
-                print(e)
-                print(
-                    "ES connection failed, but dumponfail specified, dumping to %s"
-                    % dump_dir
-                )
-        else:
-            self.elastic = plugins.elastic.Elastic()
-
+        self.dump_dir = dump_dir
+        self.cropout = config.get("debug", {}).get("cropout")
         if parse_html:
             import html2text
-
             self.html2text = html2text.html2text
-        self.dbname = config["elasticsearch"].get("dbname", "ponymail")
-
-        # Always allow this to be set; will be replaced as necessary by wait_for_active_shards
-        self.consistency = config["elasticsearch"].get("write", "quorum")
-        es_engine_major = self.elastic.engineMajor()
-        if es_engine_major == 2:
-            pass
-        elif es_engine_major in [5, 6, 7]:
-            self.wait_for_active_shards = config["elasticsearch"].get("wait", 1)
-        else:
-            raise Exception("Unexpected elasticsearch version ", es_engine_major)
-        self.cropout = config.get("debug", {}).get("cropout")
 
 
     def message_body(self, msg: email.message.Message, verbose=False, ignore_body=None):
@@ -493,6 +470,29 @@ class Archiver(object):  # N.B. Also used by import-mbox.py
             print("**** Dry run, not saving message to database *****")
             return lid, ojson["mid"]
 
+        if self.dump_dir:
+            try:
+                self.elastic = plugins.elastic.Elastic()
+            except elasticsearch.exceptions.ElasticsearchException as e:
+                print(e)
+                print(
+                    "ES connection failed, but dumponfail specified, dumping to %s"
+                    % self.dump_dir
+                )
+        else:
+            self.elastic = plugins.elastic.Elastic()
+
+
+        # Always allow this to be set; will be replaced as necessary by wait_for_active_shards
+        self.consistency = config["elasticsearch"].get("write", "quorum")
+        es_engine_major = self.elastic.engineMajor()
+        if es_engine_major == 2:
+            pass
+        elif es_engine_major in [5, 6, 7]:
+            self.wait_for_active_shards = config["elasticsearch"].get("wait", 1)
+        else:
+            raise Exception("Unexpected elasticsearch version ", es_engine_major)
+
         try:
             if contents:
                 for key in contents:
@@ -503,7 +503,7 @@ class Archiver(object):  # N.B. Also used by import-mbox.py
                     )
 
             self.index(
-                index=self.elastic.db_mbox
+                index=self.elastic.db_mbox,
                 id=ojson["mid"],
                 consistency=self.consistency,
                 body=ojson,
diff --git a/tools/plugins/elastic.py b/tools/plugins/elastic.py
index 541c0a0..896caf0 100755
--- a/tools/plugins/elastic.py
+++ b/tools/plugins/elastic.py
@@ -60,6 +60,7 @@ class Elastic:
         self.db_session = self.dbname + '-session'
         self.db_notification = self.dbname + '-notification'
         self.db_mailinglist = self.dbname + '-mailinglist'
+        self.db_version = 0
 
         ssl = config.get('elasticsearch', 'ssl', fallback=False)
         uri = config.get('elasticsearch', 'uri', fallback='')
@@ -70,6 +71,9 @@ class Elastic:
                 config.get('elasticsearch', 'password')
             )
 
+        # Always allow this to be set; will be replaced as necessary by wait_for_active_shards
+        self.consistency = config.get("elasticsearch", "write", fallback="quorum")
+
         # elasticsearch logs lots of warnings on retries/connection failure
         logging.getLogger("elasticsearch").setLevel(logging.ERROR)
 
@@ -94,7 +98,15 @@ class Elastic:
             max_retries=5,
             retry_on_timeout=True,
         )
-        self.dbVersion = None
+
+        es_engine_major = self.engineMajor()
+        if es_engine_major == 2:
+            pass
+        elif es_engine_major in [5, 6, 7]:
+            self.wait_for_active_shards = config.get("elasticsearch", "fallback", fallback=1)
+        else:
+            raise Exception("Unexpected elasticsearch version ", es_engine_major)
+
         # Mimic ES hierarchy: es.indices.xyz()
         self.indices = _indices_wrap(self)
 
@@ -105,13 +117,13 @@ class Elastic:
         return ES_VERSION[0]
 
     def engineVersion(self):
-        if not self.dbVersion:
+        if not self.db_version:
             try:
-                self.dbVersion = self.info()["version"]["number"]
+                self.db_version = self.es.info()["version"]["number"]
             except ES_ConnectionError:
                 # default if cannot connect; allows retry
                 return "0.0.0"
-        return self.dbVersion
+        return self.db_version
 
     def engineMajor(self):
         return int(self.engineVersion().split(".")[0])