You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2020/08/17 12:07:14 UTC
[incubator-ponymail-foal] branch master updated: Move elastic
options into elastic.py,
move archiver's dependency on ES into archive_message
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
The following commit(s) were added to refs/heads/master by this push:
new ce0da91 Move elastic options into elastic.py, move archiver's dependency on ES into archive_message
ce0da91 is described below
commit ce0da913de05c41908f7b4a9c6ce5d20b371e4b4
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Aug 17 14:07:01 2020 +0200
Move elastic options into elastic.py, move archiver's dependency on ES into archive_message
alos fix a typo
---
tools/archiver.py | 52 ++++++++++++++++++++++++------------------------
tools/plugins/elastic.py | 20 +++++++++++++++----
2 files changed, 42 insertions(+), 30 deletions(-)
diff --git a/tools/archiver.py b/tools/archiver.py
index b2b433e..8bc4f17 100755
--- a/tools/archiver.py
+++ b/tools/archiver.py
@@ -212,34 +212,11 @@ class Archiver(object): # N.B. Also used by import-mbox.py
""" Just initialize ES. """
self.html = parse_html
self.generator = generator
- if dump_dir:
- try:
- self.elastic = plugins.elastic.Elastic()
- except elasticsearch.exceptions.ElasticsearchException as e:
- print(e)
- print(
- "ES connection failed, but dumponfail specified, dumping to %s"
- % dump_dir
- )
- else:
- self.elastic = plugins.elastic.Elastic()
-
+ self.dump_dir = dump_dir
+ self.cropout = config.get("debug", {}).get("cropout")
if parse_html:
import html2text
-
self.html2text = html2text.html2text
- self.dbname = config["elasticsearch"].get("dbname", "ponymail")
-
- # Always allow this to be set; will be replaced as necessary by wait_for_active_shards
- self.consistency = config["elasticsearch"].get("write", "quorum")
- es_engine_major = self.elastic.engineMajor()
- if es_engine_major == 2:
- pass
- elif es_engine_major in [5, 6, 7]:
- self.wait_for_active_shards = config["elasticsearch"].get("wait", 1)
- else:
- raise Exception("Unexpected elasticsearch version ", es_engine_major)
- self.cropout = config.get("debug", {}).get("cropout")
def message_body(self, msg: email.message.Message, verbose=False, ignore_body=None):
@@ -493,6 +470,29 @@ class Archiver(object): # N.B. Also used by import-mbox.py
print("**** Dry run, not saving message to database *****")
return lid, ojson["mid"]
+ if self.dump_dir:
+ try:
+ self.elastic = plugins.elastic.Elastic()
+ except elasticsearch.exceptions.ElasticsearchException as e:
+ print(e)
+ print(
+ "ES connection failed, but dumponfail specified, dumping to %s"
+ % self.dump_dir
+ )
+ else:
+ self.elastic = plugins.elastic.Elastic()
+
+
+ # Always allow this to be set; will be replaced as necessary by wait_for_active_shards
+ self.consistency = config["elasticsearch"].get("write", "quorum")
+ es_engine_major = self.elastic.engineMajor()
+ if es_engine_major == 2:
+ pass
+ elif es_engine_major in [5, 6, 7]:
+ self.wait_for_active_shards = config["elasticsearch"].get("wait", 1)
+ else:
+ raise Exception("Unexpected elasticsearch version ", es_engine_major)
+
try:
if contents:
for key in contents:
@@ -503,7 +503,7 @@ class Archiver(object): # N.B. Also used by import-mbox.py
)
self.index(
- index=self.elastic.db_mbox
+ index=self.elastic.db_mbox,
id=ojson["mid"],
consistency=self.consistency,
body=ojson,
diff --git a/tools/plugins/elastic.py b/tools/plugins/elastic.py
index 541c0a0..896caf0 100755
--- a/tools/plugins/elastic.py
+++ b/tools/plugins/elastic.py
@@ -60,6 +60,7 @@ class Elastic:
self.db_session = self.dbname + '-session'
self.db_notification = self.dbname + '-notification'
self.db_mailinglist = self.dbname + '-mailinglist'
+ self.db_version = 0
ssl = config.get('elasticsearch', 'ssl', fallback=False)
uri = config.get('elasticsearch', 'uri', fallback='')
@@ -70,6 +71,9 @@ class Elastic:
config.get('elasticsearch', 'password')
)
+ # Always allow this to be set; will be replaced as necessary by wait_for_active_shards
+ self.consistency = config.get("elasticsearch", "write", fallback="quorum")
+
# elasticsearch logs lots of warnings on retries/connection failure
logging.getLogger("elasticsearch").setLevel(logging.ERROR)
@@ -94,7 +98,15 @@ class Elastic:
max_retries=5,
retry_on_timeout=True,
)
- self.dbVersion = None
+
+ es_engine_major = self.engineMajor()
+ if es_engine_major == 2:
+ pass
+ elif es_engine_major in [5, 6, 7]:
+ self.wait_for_active_shards = config.get("elasticsearch", "fallback", fallback=1)
+ else:
+ raise Exception("Unexpected elasticsearch version ", es_engine_major)
+
# Mimic ES hierarchy: es.indices.xyz()
self.indices = _indices_wrap(self)
@@ -105,13 +117,13 @@ class Elastic:
return ES_VERSION[0]
def engineVersion(self):
- if not self.dbVersion:
+ if not self.db_version:
try:
- self.dbVersion = self.info()["version"]["number"]
+ self.db_version = self.es.info()["version"]["number"]
except ES_ConnectionError:
# default if cannot connect; allows retry
return "0.0.0"
- return self.dbVersion
+ return self.db_version
def engineMajor(self):
return int(self.engineVersion().split(".")[0])