You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ic...@apache.org on 2021/10/29 10:05:30 UTC
svn commit: r1894611 [2/5] - in /httpd/httpd/trunk: ./ test/
test/modules/md/ test/modules/md/data/ test/modules/md/data/store_migrate/
test/modules/md/data/store_migrate/1.0/
test/modules/md/data/store_migrate/1.0/sample1/
test/modules/md/data/store_m...
Added: httpd/httpd/trunk/test/modules/md/md_env.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/md_env.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/md_env.py (added)
+++ httpd/httpd/trunk/test/modules/md/md_env.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,605 @@
+import copy
+import inspect
+import json
+import logging
+from configparser import ConfigParser, ExtendedInterpolation
+
+import pytest
+import re
+import os
+import shutil
+import subprocess
+import time
+
+from datetime import datetime, timedelta
+from typing import Dict, Optional
+
+from pyhttpd.certs import CertificateSpec
+from .md_cert_util import MDCertUtil
+from pyhttpd.env import HttpdTestSetup, HttpdTestEnv
+from pyhttpd.result import ExecResult
+
+log = logging.getLogger(__name__)
+
+
+class MDTestSetup(HttpdTestSetup):
+
+ def __init__(self, env: 'HttpdTestEnv'):
+ super().__init__(env=env)
+
+ def make(self):
+ super().make(add_modules=["proxy_connect", "md"])
+ if "pebble" == self.env.acme_server:
+ self._make_pebble_conf()
+
+ def _make_pebble_conf(self):
+ our_dir = os.path.dirname(inspect.getfile(MDTestSetup))
+ conf_src_dir = os.path.join(our_dir, 'pebble')
+ conf_dest_dir = os.path.join(self.env.gen_dir, 'pebble')
+ if not os.path.exists(conf_dest_dir):
+ os.makedirs(conf_dest_dir)
+ for name in os.listdir(conf_src_dir):
+ src_path = os.path.join(conf_src_dir, name)
+ m = re.match(r'(.+).template', name)
+ if m:
+ self._make_template(src_path, os.path.join(conf_dest_dir, m.group(1)))
+ elif os.path.isfile(src_path):
+ shutil.copy(src_path, os.path.join(conf_dest_dir, name))
+
+
+class MDTestEnv(HttpdTestEnv):
+
+ MD_S_UNKNOWN = 0
+ MD_S_INCOMPLETE = 1
+ MD_S_COMPLETE = 2
+ MD_S_EXPIRED = 3
+ MD_S_ERROR = 4
+
+ EMPTY_JOUT = {'status': 0, 'output': []}
+
+ DOMAIN_SUFFIX = "%d.org" % time.time()
+ LOG_FMT_TIGHT = '%(levelname)s: %(message)s'
+
+ @classmethod
+ def get_acme_server(cls):
+ return os.environ['ACME'] if 'ACME' in os.environ else "pebble"
+
+ @classmethod
+ def has_acme_server(cls):
+ return cls.get_acme_server() != 'none'
+
+ @classmethod
+ def has_acme_eab(cls):
+ return cls.get_acme_server() == 'pebble'
+
+ @classmethod
+ def is_pebble(cls) -> bool:
+ return cls.get_acme_server() == 'pebble'
+
+ @classmethod
+ def lacks_ocsp(cls):
+ return cls.is_pebble()
+
+ @classmethod
+ def has_a2md(cls):
+ dir = os.path.dirname(inspect.getfile(HttpdTestEnv))
+ config = ConfigParser(interpolation=ExtendedInterpolation())
+ config.read(os.path.join(dir, 'config.ini'))
+ bin_dir = config.get('global', 'bindir')
+ a2md_bin = os.path.join(bin_dir, 'a2md')
+ return os.path.isfile(a2md_bin)
+
+ def __init__(self, pytestconfig=None, setup_dirs=True):
+ super().__init__(pytestconfig=pytestconfig,
+ local_dir=os.path.dirname(inspect.getfile(MDTestEnv)),
+ interesting_modules=["md"])
+ self._acme_server = self.get_acme_server()
+ self._acme_tos = "accepted"
+ self._acme_ca_pemfile = os.path.join(self.gen_dir, "apache/acme-ca.pem")
+ if "pebble" == self._acme_server:
+ self._acme_url = "https://localhost:14000/dir"
+ self._acme_eab_url = "https://localhost:14001/dir"
+ elif "boulder" == self._acme_server:
+ self._acme_url = "http://localhost:4001/directory"
+ self._acme_eab_url = None
+ else:
+ raise Exception(f"unknown ACME server type: {self._acme_server}")
+ self._acme_server_down = False
+ self._acme_server_ok = False
+
+ self._a2md_bin = os.path.join(self.bin_dir, 'a2md')
+ self._default_domain = f"test1.{self.http_tld}"
+ self._store_dir = "./md"
+ self.set_store_dir_default()
+
+ self.add_cert_specs([
+ CertificateSpec(domains=[f"expired.{self._http_tld}"],
+ valid_from=timedelta(days=-100),
+ valid_to=timedelta(days=-10)),
+ CertificateSpec(domains=["localhost"], key_type='rsa2048'),
+ ])
+
+ if setup_dirs:
+ self._setup = MDTestSetup(env=self)
+ self._setup.make()
+ self.issue_certs()
+ self.clear_store()
+
+ def set_store_dir_default(self):
+ dirpath = "md"
+ if self.httpd_is_at_least("2.5.0"):
+ dirpath = os.path.join("state", dirpath)
+ self.set_store_dir(dirpath)
+
+ def set_store_dir(self, dirpath):
+ self._store_dir = os.path.join(self.server_dir, dirpath)
+ if self.acme_url:
+ self.a2md_stdargs([self.a2md_bin, "-a", self.acme_url, "-d", self._store_dir, "-C", self.acme_ca_pemfile, "-j"])
+ self.a2md_rawargs([self.a2md_bin, "-a", self.acme_url, "-d", self._store_dir, "-C", self.acme_ca_pemfile])
+
+ def get_apxs_var(self, name: str) -> str:
+ p = subprocess.run([self._apxs, "-q", name], capture_output=True, text=True)
+ if p.returncode != 0:
+ return ""
+ return p.stdout.strip()
+
+ @property
+ def acme_server(self):
+ return self._acme_server
+
+ @property
+ def acme_url(self):
+ return self._acme_url
+
+ @property
+ def acme_tos(self):
+ return self._acme_tos
+
+ @property
+ def a2md_bin(self):
+ return self._a2md_bin
+
+ @property
+ def acme_ca_pemfile(self):
+ return self._acme_ca_pemfile
+
+ @property
+ def store_dir(self):
+ return self._store_dir
+
+ def get_request_domain(self, request):
+ return "%s-%s" % (re.sub(r'[_]', '-', request.node.originalname), MDTestEnv.DOMAIN_SUFFIX)
+
+ def get_method_domain(self, method):
+ return "%s-%s" % (re.sub(r'[_]', '-', method.__name__.lower()), MDTestEnv.DOMAIN_SUFFIX)
+
+ def get_module_domain(self, module):
+ return "%s-%s" % (re.sub(r'[_]', '-', module.__name__.lower()), MDTestEnv.DOMAIN_SUFFIX)
+
+ def get_class_domain(self, c):
+ return "%s-%s" % (re.sub(r'[_]', '-', c.__name__.lower()), MDTestEnv.DOMAIN_SUFFIX)
+
+ # --------- cmd execution ---------
+
+ _a2md_args = []
+ _a2md_args_raw = []
+
+ def a2md_stdargs(self, args):
+ self._a2md_args = [] + args
+
+ def a2md_rawargs(self, args):
+ self._a2md_args_raw = [] + args
+
+ def a2md(self, args, raw=False) -> ExecResult:
+ preargs = self._a2md_args
+ if raw:
+ preargs = self._a2md_args_raw
+ log.debug("running: {0} {1}".format(preargs, args))
+ return self.run(preargs + args)
+
+ def check_acme(self):
+ if self._acme_server_ok:
+ return True
+ if self._acme_server_down:
+ pytest.skip(msg="ACME server not running")
+ return False
+ if self.is_live(self.acme_url, timeout=timedelta(seconds=0.5)):
+ self._acme_server_ok = True
+ return True
+ else:
+ self._acme_server_down = True
+ pytest.fail(msg="ACME server not running", pytrace=False)
+ return False
+
+ def get_ca_pem_file(self, hostname: str) -> Optional[str]:
+ pem_file = super().get_ca_pem_file(hostname)
+ if pem_file is None:
+ pem_file = self.acme_ca_pemfile
+ return pem_file
+
+ # --------- access local store ---------
+
+ def purge_store(self):
+ log.debug("purge store dir: %s" % self._store_dir)
+ assert len(self._store_dir) > 1
+ if os.path.exists(self._store_dir):
+ shutil.rmtree(self._store_dir, ignore_errors=False)
+ os.makedirs(self._store_dir)
+
+ def clear_store(self):
+ log.debug("clear store dir: %s" % self._store_dir)
+ assert len(self._store_dir) > 1
+ if not os.path.exists(self._store_dir):
+ os.makedirs(self._store_dir)
+ for dirpath in ["challenges", "tmp", "archive", "domains", "accounts", "staging", "ocsp"]:
+ shutil.rmtree(os.path.join(self._store_dir, dirpath), ignore_errors=True)
+
+ def clear_ocsp_store(self):
+ assert len(self._store_dir) > 1
+ dirpath = os.path.join(self._store_dir, "ocsp")
+ log.debug("clear ocsp store dir: %s" % dir)
+ if os.path.exists(dirpath):
+ shutil.rmtree(dirpath, ignore_errors=True)
+
+ def authz_save(self, name, content):
+ dirpath = os.path.join(self._store_dir, 'staging', name)
+ os.makedirs(dirpath)
+ open(os.path.join(dirpath, 'authz.json'), "w").write(content)
+
+ def path_store_json(self):
+ return os.path.join(self._store_dir, 'md_store.json')
+
+ def path_account(self, acct):
+ return os.path.join(self._store_dir, 'accounts', acct, 'account.json')
+
+ def path_account_key(self, acct):
+ return os.path.join(self._store_dir, 'accounts', acct, 'account.pem')
+
+ def store_domains(self):
+ return os.path.join(self._store_dir, 'domains')
+
+ def store_archives(self):
+ return os.path.join(self._store_dir, 'archive')
+
+ def store_stagings(self):
+ return os.path.join(self._store_dir, 'staging')
+
+ def store_challenges(self):
+ return os.path.join(self._store_dir, 'challenges')
+
+ def store_domain_file(self, domain, filename):
+ return os.path.join(self.store_domains(), domain, filename)
+
+ def store_archived_file(self, domain, version, filename):
+ return os.path.join(self.store_archives(), "%s.%d" % (domain, version), filename)
+
+ def store_staged_file(self, domain, filename):
+ return os.path.join(self.store_stagings(), domain, filename)
+
+ def path_fallback_cert(self, domain):
+ return os.path.join(self._store_dir, 'domains', domain, 'fallback-pubcert.pem')
+
+ def path_job(self, domain):
+ return os.path.join(self._store_dir, 'staging', domain, 'job.json')
+
+ def replace_store(self, src):
+ shutil.rmtree(self._store_dir, ignore_errors=False)
+ shutil.copytree(src, self._store_dir)
+
+ def list_accounts(self):
+ return os.listdir(os.path.join(self._store_dir, 'accounts'))
+
+ def check_md(self, domain, md=None, state=-1, ca=None, protocol=None, agreement=None, contacts=None):
+ domains = None
+ if isinstance(domain, list):
+ domains = domain
+ domain = domains[0]
+ if md:
+ domain = md
+ path = self.store_domain_file(domain, 'md.json')
+ with open(path) as f:
+ md = json.load(f)
+ assert md
+ if domains:
+ assert md['domains'] == domains
+ if state >= 0:
+ assert md['state'] == state
+ if ca:
+ assert md['ca']['url'] == ca
+ if protocol:
+ assert md['ca']['proto'] == protocol
+ if agreement:
+ assert md['ca']['agreement'] == agreement
+ if contacts:
+ assert md['contacts'] == contacts
+
+ def pkey_fname(self, pkeyspec=None):
+ if pkeyspec and not re.match(r'^rsa( ?\d+)?$', pkeyspec.lower()):
+ return "privkey.{0}.pem".format(pkeyspec.lower())
+ return 'privkey.pem'
+
+ def cert_fname(self, pkeyspec=None):
+ if pkeyspec and not re.match(r'^rsa( ?\d+)?$', pkeyspec.lower()):
+ return "pubcert.{0}.pem".format(pkeyspec.lower())
+ return 'pubcert.pem'
+
+ def check_md_complete(self, domain, pkey=None):
+ md = self.get_md_status(domain)
+ assert md
+ assert 'state' in md, "md is unexpected: {0}".format(md)
+ assert md['state'] is MDTestEnv.MD_S_COMPLETE, f"unexpected state: {md['state']}"
+ pkey_file = self.store_domain_file(domain, self.pkey_fname(pkey))
+ cert_file = self.store_domain_file(domain, self.cert_fname(pkey))
+ r = self.run(['ls', os.path.dirname(pkey_file)])
+ if not os.path.isfile(pkey_file):
+ assert False, f"pkey missing: {pkey_file}: {r.stdout}"
+ if not os.path.isfile(cert_file):
+ assert False, f"cert missing: {cert_file}: {r.stdout}"
+
+ def check_md_credentials(self, domain):
+ if isinstance(domain, list):
+ domains = domain
+ domain = domains[0]
+ else:
+ domains = [domain]
+ # check private key, validate certificate, etc
+ MDCertUtil.validate_privkey(self.store_domain_file(domain, 'privkey.pem'))
+ cert = MDCertUtil(self.store_domain_file(domain, 'pubcert.pem'))
+ cert.validate_cert_matches_priv_key(self.store_domain_file(domain, 'privkey.pem'))
+ # check SANs and CN
+ assert cert.get_cn() == domain
+ # compare lists twice in opposite directions: SAN may not respect ordering
+ san_list = list(cert.get_san_list())
+ assert len(san_list) == len(domains)
+ assert set(san_list).issubset(domains)
+ assert set(domains).issubset(san_list)
+ # check valid dates interval
+ not_before = cert.get_not_before()
+ not_after = cert.get_not_after()
+ assert not_before < datetime.now(not_before.tzinfo)
+ assert not_after > datetime.now(not_after.tzinfo)
+
+ # --------- check utilities ---------
+
+ def check_json_contains(self, actual, expected):
+ # write all expected key:value bindings to a copy of the actual data ...
+ # ... assert it stays unchanged
+ test_json = copy.deepcopy(actual)
+ test_json.update(expected)
+ assert actual == test_json
+
+ def check_file_access(self, path, exp_mask):
+ actual_mask = os.lstat(path).st_mode & 0o777
+ assert oct(actual_mask) == oct(exp_mask)
+
+ def check_dir_empty(self, path):
+ assert os.listdir(path) == []
+
+ def get_http_status(self, domain, path, use_https=True):
+ r = self.get_meta(domain, path, use_https, insecure=True)
+ return r.response['status']
+
+ def get_cert(self, domain, tls=None, ciphers=None):
+ return MDCertUtil.load_server_cert(self._httpd_addr, self.https_port,
+ domain, tls=tls, ciphers=ciphers)
+
+ def get_server_cert(self, domain, proto=None, ciphers=None):
+ args = [
+ "openssl", "s_client", "-status",
+ "-connect", "%s:%s" % (self._httpd_addr, self.https_port),
+ "-CAfile", self.acme_ca_pemfile,
+ "-servername", domain,
+ "-showcerts"
+ ]
+ if proto is not None:
+ args.extend(["-{0}".format(proto)])
+ if ciphers is not None:
+ args.extend(["-cipher", ciphers])
+ r = self.run(args)
+ # noinspection PyBroadException
+ try:
+ return MDCertUtil.parse_pem_cert(r.stdout)
+ except:
+ return None
+
+ def verify_cert_key_lenghts(self, domain, pkeys):
+ for p in pkeys:
+ cert = self.get_server_cert(domain, proto="tls1_2", ciphers=p['ciphers'])
+ if 0 == p['keylen']:
+ assert cert is None
+ else:
+ assert cert, "no cert returned for cipher: {0}".format(p['ciphers'])
+ assert cert.get_key_length() == p['keylen'], "key length, expected {0}, got {1}".format(
+ p['keylen'], cert.get_key_length()
+ )
+
+ def get_meta(self, domain, path, use_https=True, insecure=False):
+ schema = "https" if use_https else "http"
+ port = self.https_port if use_https else self.http_port
+ r = self.curl_get(f"{schema}://{domain}:{port}{path}", insecure=insecure)
+ assert r.exit_code == 0
+ assert r.response
+ assert r.response['header']
+ return r
+
+ def get_content(self, domain, path, use_https=True):
+ schema = "https" if use_https else "http"
+ port = self.https_port if use_https else self.http_port
+ r = self.curl_get(f"{schema}://{domain}:{port}{path}")
+ assert r.exit_code == 0
+ return r.stdout
+
+ def get_json_content(self, domain, path, use_https=True, insecure=False,
+ debug_log=True):
+ schema = "https" if use_https else "http"
+ port = self.https_port if use_https else self.http_port
+ url = f"{schema}://{domain}:{port}{path}"
+ r = self.curl_get(url, insecure=insecure, debug_log=debug_log)
+ if r.exit_code != 0:
+ log.error(f"curl get on {url} returned {r.exit_code}"
+ f"\nstdout: {r.stdout}"
+ f"\nstderr: {r.stderr}")
+ assert r.exit_code == 0, r.stderr
+ return r.json
+
+ def get_certificate_status(self, domain) -> Dict:
+ return self.get_json_content(domain, "/.httpd/certificate-status", insecure=True)
+
+ def get_md_status(self, domain, via_domain=None, use_https=True, debug_log=False) -> Dict:
+ if via_domain is None:
+ via_domain = self._default_domain
+ return self.get_json_content(via_domain, f"/md-status/{domain}",
+ use_https=use_https, debug_log=debug_log)
+
+ def get_server_status(self, query="/", via_domain=None, use_https=True):
+ if via_domain is None:
+ via_domain = self._default_domain
+ return self.get_content(via_domain, "/server-status%s" % query, use_https=use_https)
+
+ def await_completion(self, names, must_renew=False, restart=True, timeout=60,
+ via_domain=None, use_https=True):
+ try_until = time.time() + timeout
+ renewals = {}
+ names = names.copy()
+ while len(names) > 0:
+ if time.time() >= try_until:
+ return False
+ for name in names:
+ mds = self.get_md_status(name, via_domain=via_domain, use_https=use_https)
+ if mds is None:
+ log.debug("not managed by md: %s" % name)
+ return False
+
+ if 'renewal' in mds:
+ renewal = mds['renewal']
+ renewals[name] = True
+ if 'finished' in renewal and renewal['finished'] is True:
+ if (not must_renew) or (name in renewals):
+ log.debug(f"domain cert was renewed: {name}")
+ names.remove(name)
+
+ if len(names) != 0:
+ time.sleep(0.1)
+ if restart:
+ time.sleep(0.1)
+ return self.apache_restart() == 0
+ return True
+
+ def is_renewing(self, name):
+ stat = self.get_certificate_status(name)
+ return 'renewal' in stat
+
+ def await_renewal(self, names, timeout=60):
+ try_until = time.time() + timeout
+ while len(names) > 0:
+ if time.time() >= try_until:
+ return False
+ for name in names:
+ md = self.get_md_status(name)
+ if md is None:
+ log.debug("not managed by md: %s" % name)
+ return False
+
+ if 'renewal' in md:
+ names.remove(name)
+
+ if len(names) != 0:
+ time.sleep(0.1)
+ return True
+
+ def await_error(self, domain, timeout=60, via_domain=None, use_https=True, errors=1):
+ try_until = time.time() + timeout
+ while True:
+ if time.time() >= try_until:
+ return False
+ md = self.get_md_status(domain, via_domain=via_domain, use_https=use_https)
+ if md:
+ if 'state' in md and md['state'] == MDTestEnv.MD_S_ERROR:
+ return md
+ if 'renewal' in md and 'errors' in md['renewal'] \
+ and md['renewal']['errors'] >= errors:
+ return md
+ time.sleep(0.1)
+ return None
+
+ def await_file(self, fpath, timeout=60):
+ try_until = time.time() + timeout
+ while True:
+ if time.time() >= try_until:
+ return False
+ if os.path.isfile(fpath):
+ return True
+ time.sleep(0.1)
+
+ def check_file_permissions(self, domain):
+ dpath = os.path.join(self.store_dir, 'domains', domain)
+ assert os.path.isdir(dpath)
+ md = json.load(open(os.path.join(dpath, 'md.json')))
+ assert md
+ acct = md['ca']['account']
+ assert acct
+ self.check_file_access(self.path_store_json(), 0o600)
+ # domains
+ self.check_file_access(self.store_domains(), 0o700)
+ self.check_file_access(os.path.join(self.store_domains(), domain), 0o700)
+ self.check_file_access(self.store_domain_file(domain, 'privkey.pem'), 0o600)
+ self.check_file_access(self.store_domain_file(domain, 'pubcert.pem'), 0o600)
+ self.check_file_access(self.store_domain_file(domain, 'md.json'), 0o600)
+ # archive
+ self.check_file_access(self.store_archived_file(domain, 1, 'md.json'), 0o600)
+ # accounts
+ self.check_file_access(os.path.join(self._store_dir, 'accounts'), 0o755)
+ self.check_file_access(os.path.join(self._store_dir, 'accounts', acct), 0o755)
+ self.check_file_access(self.path_account(acct), 0o644)
+ self.check_file_access(self.path_account_key(acct), 0o644)
+ # staging
+ self.check_file_access(self.store_stagings(), 0o755)
+
+ def get_ocsp_status(self, domain, proto=None, cipher=None, ca_file=None):
+ stat = {}
+ args = [
+ "openssl", "s_client", "-status",
+ "-connect", "%s:%s" % (self._httpd_addr, self.https_port),
+ "-CAfile", ca_file if ca_file else self.acme_ca_pemfile,
+ "-servername", domain,
+ "-showcerts"
+ ]
+ if proto is not None:
+ args.extend(["-{0}".format(proto)])
+ if cipher is not None:
+ args.extend(["-cipher", cipher])
+ r = self.run(args, debug_log=False)
+ ocsp_regex = re.compile(r'OCSP response: +([^=\n]+)\n')
+ matches = ocsp_regex.finditer(r.stdout)
+ for m in matches:
+ if m.group(1) != "":
+ stat['ocsp'] = m.group(1)
+ if 'ocsp' not in stat:
+ ocsp_regex = re.compile(r'OCSP Response Status:\s*(.+)')
+ matches = ocsp_regex.finditer(r.stdout)
+ for m in matches:
+ if m.group(1) != "":
+ stat['ocsp'] = m.group(1)
+ verify_regex = re.compile(r'Verify return code:\s*(.+)')
+ matches = verify_regex.finditer(r.stdout)
+ for m in matches:
+ if m.group(1) != "":
+ stat['verify'] = m.group(1)
+ return stat
+
+ def await_ocsp_status(self, domain, timeout=10, ca_file=None):
+ try_until = time.time() + timeout
+ while True:
+ if time.time() >= try_until:
+ break
+ stat = self.get_ocsp_status(domain, ca_file=ca_file)
+ if 'ocsp' in stat and stat['ocsp'] != "no response sent":
+ return stat
+ time.sleep(0.1)
+ raise TimeoutError(f"ocsp respopnse not available: {domain}")
+
+ def create_self_signed_cert(self, name_list, valid_days, serial=1000, path=None):
+ dirpath = path
+ if not path:
+ dirpath = os.path.join(self.store_domains(), name_list[0])
+ return MDCertUtil.create_self_signed_cert(dirpath, name_list, valid_days, serial)
\ No newline at end of file
Propchange: httpd/httpd/trunk/test/modules/md/md_env.py
------------------------------------------------------------------------------
svn:executable = *
Added: httpd/httpd/trunk/test/modules/md/message.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/message.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/message.py (added)
+++ httpd/httpd/trunk/test/modules/md/message.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,26 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+
+
+def main(argv):
+ if len(argv) > 2:
+ cmd = argv[2]
+ if 'renewing' != cmd:
+ f1 = open(argv[1], 'a+')
+ f1.write(f'{argv}\n')
+ if 'MD_VERSION' in os.environ:
+ f1.write(f'MD_VERSION={os.environ["MD_VERSION"]}\n')
+ if 'MD_STORE' in os.environ:
+ f1.write(f'MD_STORE={os.environ["MD_STORE"]}\n')
+ f1.close()
+ sys.stderr.write("done, all fine.\n")
+ sys.exit(0)
+ else:
+ sys.stderr.write(f"{argv[0]} without arguments")
+ sys.exit(7)
+
+
+if __name__ == "__main__":
+ main(sys.argv)
Propchange: httpd/httpd/trunk/test/modules/md/message.py
------------------------------------------------------------------------------
svn:executable = *
Added: httpd/httpd/trunk/test/modules/md/msg_fail_on.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/msg_fail_on.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/msg_fail_on.py (added)
+++ httpd/httpd/trunk/test/modules/md/msg_fail_on.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,28 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+
+
+def main(argv):
+ if len(argv) > 3:
+ log = argv[1]
+ fail_on = argv[2]
+ cmd = argv[3]
+ domain = argv[4]
+ if 'renewing' != cmd:
+ f1 = open(log, 'a+')
+ f1.write(f"{[argv[0], log, cmd, domain]}\n")
+ f1.close()
+ if cmd.startswith(fail_on):
+ sys.stderr.write(f"failing on: {cmd}\n")
+ sys.exit(1)
+ sys.stderr.write("done, all fine.\n")
+ sys.exit(0)
+ else:
+ sys.stderr.write("%s without arguments" % (argv[0]))
+ sys.exit(7)
+
+
+if __name__ == "__main__":
+ main(sys.argv)
Propchange: httpd/httpd/trunk/test/modules/md/msg_fail_on.py
------------------------------------------------------------------------------
svn:executable = *
Added: httpd/httpd/trunk/test/modules/md/notifail.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/notifail.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/notifail.py (added)
+++ httpd/httpd/trunk/test/modules/md/notifail.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,16 @@
+#!/usr/bin/env python3
+
+import sys
+
+
+def main(argv):
+ if len(argv) > 1:
+ msg = argv[2] if len(argv) > 2 else None
+ # fail on later messaging stages, not the initial 'renewing' one.
+ # we have test_901_030 that check that later stages are not invoked
+ # when misconfigurations are detected early.
+ sys.exit(1 if msg != "renewing" else 0)
+
+
+if __name__ == "__main__":
+ main(sys.argv)
Propchange: httpd/httpd/trunk/test/modules/md/notifail.py
------------------------------------------------------------------------------
svn:executable = *
Added: httpd/httpd/trunk/test/modules/md/notify.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/notify.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/notify.py (added)
+++ httpd/httpd/trunk/test/modules/md/notify.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,18 @@
+#!/usr/bin/env python3
+
+import sys
+
+
+def main(argv):
+ if len(argv) > 2:
+ with open(argv[1], 'a+') as f1:
+ f1.write(f"{argv}\n")
+ sys.stderr.write("done, all fine.\n")
+ sys.exit(0)
+ else:
+ sys.stderr.write(f"{argv[0]} without arguments")
+ sys.exit(7)
+
+
+if __name__ == "__main__":
+ main(sys.argv)
Propchange: httpd/httpd/trunk/test/modules/md/notify.py
------------------------------------------------------------------------------
svn:executable = *
Added: httpd/httpd/trunk/test/modules/md/pebble/pebble-eab.json.template
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/pebble/pebble-eab.json.template?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/pebble/pebble-eab.json.template (added)
+++ httpd/httpd/trunk/test/modules/md/pebble/pebble-eab.json.template Fri Oct 29 10:05:29 2021
@@ -0,0 +1,16 @@
+{
+ "pebble": {
+ "listenAddress": "0.0.0.0:14000",
+ "managementListenAddress": "0.0.0.0:15000",
+ "certificate": "${server_dir}/ca/localhost.rsa2048.cert.pem",
+ "privateKey": "${server_dir}/ca/localhost.rsa2048.pkey.pem",
+ "httpPort": ${http_port},
+ "tlsPort": ${https_port},
+ "ocspResponderURL": "",
+ "externalAccountBindingRequired": true,
+ "externalAccountMACKeys": {
+ "kid-1": "zWNDZM6eQGHWpSRTPal5eIUYFTu7EajVIoguysqZ9wG44nMEtx3MUAsUDkMTQ12W",
+ "kid-2": "b10lLJs8l1GPIzsLP0s6pMt8O0XVGnfTaCeROxQM0BIt2XrJMDHJZBM5NuQmQJQH"
+ }
+ }
+}
Added: httpd/httpd/trunk/test/modules/md/pebble/pebble.json.template
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/pebble/pebble.json.template?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/pebble/pebble.json.template (added)
+++ httpd/httpd/trunk/test/modules/md/pebble/pebble.json.template Fri Oct 29 10:05:29 2021
@@ -0,0 +1,12 @@
+{
+ "pebble": {
+ "listenAddress": "0.0.0.0:14000",
+ "managementListenAddress": "0.0.0.0:15000",
+ "certificate": "${server_dir}/ca/localhost.rsa2048.cert.pem",
+ "privateKey": "${server_dir}/ca/localhost.rsa2048.pkey.pem",
+ "httpPort": ${http_port},
+ "tlsPort": ${https_port},
+ "ocspResponderURL": "",
+ "externalAccountBindingRequired": false
+ }
+}
Added: httpd/httpd/trunk/test/modules/md/test_001_store.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/test_001_store.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/test_001_store.py (added)
+++ httpd/httpd/trunk/test/modules/md/test_001_store.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,213 @@
+# test mod_md acme terms-of-service handling
+
+import re
+
+import pytest
+
+from .md_env import MDTestEnv
+
+
+def md_name(md):
+ return md['name']
+
+
+@pytest.mark.skipif(condition=not MDTestEnv.has_a2md(), reason="no a2md available")
+class TestStore:
+
+ @pytest.fixture(autouse=True, scope='function')
+ def _method_scope(self, env):
+ env.purge_store()
+
+ # verify expected binary version
+ def test_md_001_001(self, env: MDTestEnv):
+ r = env.run([env.a2md_bin, "-V"])
+ m = re.match(r'version: (\d+\.\d+\.\d+)(-git)?$', r.stdout)
+ assert m, f"expected version info in '{r.stdout}'"
+
+ # verify that store is clean
+ def test_md_001_002(self, env: MDTestEnv):
+ r = env.run(["find", env.store_dir])
+ assert re.match(env.store_dir, r.stdout)
+
+ # test case: add a single dns managed domain
+ def test_md_001_100(self, env: MDTestEnv):
+ dns = "greenbytes.de"
+ env.check_json_contains(
+ env.a2md(["store", "add", dns]).json['output'][0],
+ {
+ "name": dns,
+ "domains": [dns],
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": 0
+ })
+
+ # test case: add > 1 dns managed domain
+ def test_md_001_101(self, env: MDTestEnv):
+ dns = ["greenbytes2.de", "www.greenbytes2.de", "mail.greenbytes2.de"]
+ env.check_json_contains(
+ env.a2md(["store", "add"] + dns).json['output'][0],
+ {
+ "name": dns[0],
+ "domains": dns,
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": 0
+ })
+
+ # test case: add second managed domain
+ def test_md_001_102(self, env: MDTestEnv):
+ dns1 = ["test000-102.com", "test000-102a.com", "test000-102b.com"]
+ assert env.a2md(["store", "add"] + dns1).exit_code == 0
+ #
+ # add second managed domain
+ dns2 = ["greenbytes2.de", "www.greenbytes2.de", "mail.greenbytes2.de"]
+ jout = env.a2md(["store", "add"] + dns2).json
+ # assert: output covers only changed md
+ assert len(jout['output']) == 1
+ env.check_json_contains(jout['output'][0], {
+ "name": dns2[0],
+ "domains": dns2,
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": 0
+ })
+
+ # test case: add existing domain
+ def test_md_001_103(self, env: MDTestEnv):
+ dns = "greenbytes.de"
+ assert env.a2md(["store", "add", dns]).exit_code == 0
+ # add same domain again
+ assert env.a2md(["store", "add", dns]).exit_code == 1
+
+ # test case: add without CA URL
+ def test_md_001_104(self, env: MDTestEnv):
+ dns = "greenbytes.de"
+ args = [env.a2md_bin, "-d", env.store_dir, "-j", "store", "add", dns]
+ jout = env.run(args).json
+ assert len(jout['output']) == 1
+ env.check_json_contains(jout['output'][0], {
+ "name": dns,
+ "domains": [dns],
+ "contacts": [],
+ "ca": {
+ "proto": "ACME"
+ },
+ "state": 0
+ })
+
+ # test case: list empty store
+ def test_md_001_200(self, env: MDTestEnv):
+ assert env.a2md(["store", "list"]).json == env.EMPTY_JOUT
+
+ # test case: list two managed domains
+ def test_md_001_201(self, env: MDTestEnv):
+ domains = [
+ ["test000-201.com", "test000-201a.com", "test000-201b.com"],
+ ["greenbytes2.de", "www.greenbytes2.de", "mail.greenbytes2.de"]
+ ]
+ for dns in domains:
+ assert env.a2md(["store", "add"] + dns).exit_code == 0
+ #
+ # list all store content
+ jout = env.a2md(["store", "list"]).json
+ assert len(jout['output']) == len(domains)
+ domains.reverse()
+ jout['output'] = sorted(jout['output'], key=md_name)
+ for i in range(0, len(jout['output'])):
+ env.check_json_contains(jout['output'][i], {
+ "name": domains[i][0],
+ "domains": domains[i],
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": 0
+ })
+
+ # test case: remove managed domain
+ def test_md_001_300(self, env: MDTestEnv):
+ dns = "test000-300.com"
+ assert env.a2md(["store", "add", dns]).exit_code == 0
+ assert env.a2md(["store", "remove", dns]).json == env.EMPTY_JOUT
+ assert env.a2md(["store", "list"]).json == env.EMPTY_JOUT
+
+ # test case: remove from list of managed domains
+ def test_md_001_301(self, env: MDTestEnv):
+ dns1 = ["test000-301.com", "test000-301a.com", "test000-301b.com"]
+ assert env.a2md(["store", "add"] + dns1).exit_code == 0
+ #
+ dns2 = ["greenbytes2.de", "www.greenbytes2.de", "mail.greenbytes2.de"]
+ jout1 = env.a2md(["store", "add"] + dns2).json
+ # remove managed domain
+ assert env.a2md(["store", "remove", "test000-301.com"]).json == env.EMPTY_JOUT
+ # list store content
+ assert env.a2md(["store", "list"]).json == jout1
+
+ # test case: remove nonexisting managed domain
+ def test_md_001_302(self, env: MDTestEnv):
+ dns1 = "test000-302.com"
+ r = env.a2md(["store", "remove", dns1])
+ assert r.exit_code == 1
+ assert r.json == {
+ 'status': 2, 'description': 'No such file or directory', 'output': []
+ }
+
+ # test case: force remove nonexisting managed domain
+ def test_md_001_303(self, env: MDTestEnv):
+ dns1 = "test000-303.com"
+ assert env.a2md(["store", "remove", "-f", dns1]).json == env.EMPTY_JOUT
+
+ # test case: null change
+ def test_md_001_400(self, env: MDTestEnv):
+ dns = "test000-400.com"
+ r1 = env.a2md(["store", "add", dns])
+ assert env.a2md(["store", "update", dns]).json == r1.json
+
+ # test case: add dns to managed domain
+ def test_md_001_401(self, env: MDTestEnv):
+ dns1 = "test000-401.com"
+ env.a2md(["store", "add", dns1])
+ dns2 = "test-101.com"
+ args = ["store", "update", dns1, "domains", dns1, dns2]
+ assert env.a2md(args).json['output'][0]['domains'] == [dns1, dns2]
+
+ # test case: change CA URL
+ def test_md_001_402(self, env: MDTestEnv):
+ dns = "test000-402.com"
+ args = ["store", "add", dns]
+ assert env.a2md(args).json['output'][0]['ca']['url'] == env.acme_url
+ nurl = "https://foo.com/"
+ args = [env.a2md_bin, "-a", nurl, "-d", env.store_dir, "-j", "store", "update", dns]
+ assert env.run(args).json['output'][0]['ca']['url'] == nurl
+
+ # test case: update nonexisting managed domain
+ def test_md_001_403(self, env: MDTestEnv):
+ dns = "test000-403.com"
+ assert env.a2md(["store", "update", dns]).exit_code == 1
+
+ # test case: update domains, throw away md name
+ def test_md_001_404(self, env: MDTestEnv):
+ dns1 = "test000-404.com"
+ dns2 = "greenbytes.com"
+ args = ["store", "add", dns1]
+ assert env.a2md(args).json['output'][0]['domains'] == [dns1]
+ # override domains list
+ args = ["store", "update", dns1, "domains", dns2]
+ assert env.a2md(args).json['output'][0]['domains'] == [dns2]
+
+ # test case: update domains with empty dns list
+ def test_md_001_405(self, env: MDTestEnv):
+ dns1 = "test000-405.com"
+ assert env.a2md(["store", "add", dns1]).exit_code == 0
+ assert env.a2md(["store", "update", dns1, "domains"]).exit_code == 1
Added: httpd/httpd/trunk/test/modules/md/test_010_store_migrate.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/test_010_store_migrate.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/test_010_store_migrate.py (added)
+++ httpd/httpd/trunk/test/modules/md/test_010_store_migrate.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,43 @@
+# test mod_md acme terms-of-service handling
+
+import os
+import pytest
+
+from .md_conf import MDConf
+from .md_env import MDTestEnv
+
+
+@pytest.mark.skipif(condition=not MDTestEnv.has_a2md(), reason="no a2md available")
+class TestStoreMigrate:
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env):
+ MDConf(env).install()
+ assert env.apache_restart() == 0
+
+ # install old store, start a2md list, check files afterwards
+ def test_md_010_000(self, env):
+ domain = "7007-1502285564.org"
+ env.replace_store(os.path.join(env.test_dir, "../modules/md/data/store_migrate/1.0/sample1"))
+ #
+ # use 1.0 file name for private key
+ fpkey_1_0 = os.path.join(env.store_dir, 'domains', domain, 'pkey.pem')
+ fpkey_1_1 = os.path.join(env.store_dir, 'domains', domain, 'privkey.pem')
+ cert_1_0 = os.path.join(env.store_dir, 'domains', domain, 'cert.pem')
+ cert_1_1 = os.path.join(env.store_dir, 'domains', domain, 'pubcert.pem')
+ chain_1_0 = os.path.join(env.store_dir, 'domains', domain, 'chain.pem')
+ #
+ assert os.path.exists(fpkey_1_0)
+ assert os.path.exists(cert_1_0)
+ assert os.path.exists(chain_1_0)
+ assert not os.path.exists(fpkey_1_1)
+ assert not os.path.exists(cert_1_1)
+ #
+ md = env.a2md(["-vvv", "list", domain]).json['output'][0]
+ assert domain == md["name"]
+ #
+ assert not os.path.exists(fpkey_1_0)
+ assert os.path.exists(cert_1_0)
+ assert os.path.exists(chain_1_0)
+ assert os.path.exists(fpkey_1_1)
+ assert os.path.exists(cert_1_1)
Added: httpd/httpd/trunk/test/modules/md/test_100_reg_add.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/test_100_reg_add.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/test_100_reg_add.py (added)
+++ httpd/httpd/trunk/test/modules/md/test_100_reg_add.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,152 @@
+# test mod_md acme terms-of-service handling
+
+import pytest
+
+from .md_env import MDTestEnv
+
+
+@pytest.mark.skipif(condition=not MDTestEnv.has_a2md(), reason="no a2md available")
+@pytest.mark.skipif(condition=not MDTestEnv.has_acme_server(),
+ reason="no ACME test server configured")
+class TestRegAdd:
+
+ @pytest.fixture(autouse=True, scope='function')
+ def _method_scope(self, env):
+ env.purge_store()
+
+ # test case: add a single dns managed domain
+ def test_md_100_000(self, env):
+ dns = "greenbytes.de"
+ jout1 = env.a2md(["add", dns]).json
+ env.check_json_contains(jout1['output'][0], {
+ "name": dns,
+ "domains": [dns],
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+ assert env.a2md(["list"]).json == jout1
+
+ # test case: add > 1 dns managed domain
+ def test_md_100_001(self, env):
+ dns = ["greenbytes2.de", "www.greenbytes2.de", "mail.greenbytes2.de"]
+ jout1 = env.a2md(["add"] + dns).json
+ env.check_json_contains(jout1['output'][0], {
+ "name": dns[0],
+ "domains": dns,
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+ assert env.a2md(["list"]).json == jout1
+
+ # test case: add second managed domain
+ def test_md_100_002(self, env):
+ dns1 = ["test100-002.com", "test100-002a.com", "test100-002b.com"]
+ env.a2md(["add"] + dns1)
+ # add second managed domain
+ dns2 = ["greenbytes2.de", "www.greenbytes2.de", "mail.greenbytes2.de"]
+ jout = env.a2md(["add"] + dns2).json
+ # assert: output covers only changed md
+ assert len(jout['output']) == 1
+ env.check_json_contains(jout['output'][0], {
+ "name": dns2[0],
+ "domains": dns2,
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+ assert len(env.a2md(["list"]).json['output']) == 2
+
+ # test case: add existing domain
+ def test_md_100_003(self, env):
+ dns = "greenbytes.de"
+ assert env.a2md(["add", dns]).exit_code == 0
+ assert env.a2md(["add", dns]).exit_code == 1
+
+ # test case: add without CA URL
+ def test_md_100_004(self, env):
+ dns = "greenbytes.de"
+ jout1 = env.run([env.a2md_bin, "-d", env.store_dir, "-j", "add", dns]).json
+ assert len(jout1['output']) == 1
+ env.check_json_contains(jout1['output'][0], {
+ "name": dns,
+ "domains": [dns],
+ "contacts": [],
+ "ca": {
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+ assert env.a2md(["list"]).json == jout1
+
+ # test case: add with invalid DNS
+ @pytest.mark.parametrize("invalid_dns", [
+ "tld", "white sp.ace", "invalid.*.wildcard.com", "k\xc3ller.idn.com"
+ ])
+ def test_md_100_005(self, env, invalid_dns):
+ assert env.a2md(["add", invalid_dns]).exit_code == 1
+ assert env.a2md(["add", "test-100.de", invalid_dns]).exit_code == 1
+
+ # test case: add with invalid ACME URL
+ @pytest.mark.parametrize("invalid_url", [
+ "no.schema/path", "http://white space/path", "http://bad.port:-1/path"])
+ def test_md_100_006(self, env, invalid_url):
+ args = [env.a2md_bin, "-a", invalid_url, "-d", env.store_dir, "-j"]
+ dns = "greenbytes.de"
+ args.extend(["add", dns])
+ assert env.run(args).exit_code == 1
+
+ # test case: add overlapping dns names
+ def test_md_100_007(self, env):
+ assert env.a2md(["add", "test-100.com", "test-101.com"]).exit_code == 0
+ # 1: alternate DNS exists as primary name
+ assert env.a2md(["add", "greenbytes2.de", "test-100.com"]).exit_code == 1
+ # 2: alternate DNS exists as alternate DNS
+ assert env.a2md(["add", "greenbytes2.de", "test-101.com"]).exit_code == 1
+ # 3: primary name exists as alternate DNS
+ assert env.a2md(["add", "test-101.com"]).exit_code == 1
+
+ # test case: add subdomains as separate managed domain
+ def test_md_100_008(self, env):
+ assert env.a2md(["add", "test-100.com"]).exit_code == 0
+ assert env.a2md(["add", "sub.test-100.com"]).exit_code == 0
+
+ # test case: add duplicate domain
+ def test_md_100_009(self, env):
+ dns1 = "test-100.com"
+ dns2 = "test-101.com"
+ jout = env.a2md(["add", dns1, dns2, dns1, dns2]).json
+ # DNS is only listed once
+ assert len(jout['output']) == 1
+ md = jout['output'][0]
+ assert md['domains'] == [dns1, dns2]
+
+ # test case: add pnuycode name
+ def test_md_100_010(self, env):
+ assert env.a2md(["add", "xn--kller-jua.punycode.de"]).exit_code == 0
+
+ # test case: don't sort alternate names
+ def test_md_100_011(self, env):
+ dns = ["test-100.com", "test-xxx.com", "test-aaa.com"]
+ jout = env.a2md(["add"] + dns).json
+ # DNS is only listed as specified
+ assert len(jout['output']) == 1
+ md = jout['output'][0]
+ assert md['domains'] == dns
+
+ # test case: add DNS wildcard
+ @pytest.mark.parametrize("wild_dns", [
+ "*.wildcard.com"
+ ])
+ def test_md_100_012(self, env, wild_dns):
+ assert env.a2md(["add", wild_dns]).exit_code == 0
Added: httpd/httpd/trunk/test/modules/md/test_110_reg_update.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/test_110_reg_update.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/test_110_reg_update.py (added)
+++ httpd/httpd/trunk/test/modules/md/test_110_reg_update.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,273 @@
+# test mod_md acme terms-of-service handling
+
+import pytest
+
+from .md_env import MDTestEnv
+
+
+@pytest.mark.skipif(condition=not MDTestEnv.has_a2md(), reason="no a2md available")
+@pytest.mark.skipif(condition=not MDTestEnv.has_acme_server(),
+ reason="no ACME test server configured")
+class TestRegUpdate:
+
+ NAME1 = "greenbytes2.de"
+ NAME2 = "test-100.com"
+
+ @pytest.fixture(autouse=True, scope='function')
+ def _method_scope(self, env):
+ env.clear_store()
+ # add managed domains
+ domains = [
+ [self.NAME1, "www.greenbytes2.de", "mail.greenbytes2.de"],
+ [self.NAME2, "test-101.com", "test-102.com"]
+ ]
+ for dns in domains:
+ env.a2md(["-a", env.acme_url, "add"] + dns)
+
+ def teardown_method(self, method):
+ print("teardown_method: %s" % method.__name__)
+
+ # test case: update domains
+ def test_md_110_000(self, env):
+ dns = ["foo.de", "bar.de"]
+ output1 = env.a2md(["-vvvv", "update", self.NAME1, "domains"] + dns).json['output']
+ assert len(output1) == 1
+ env.check_json_contains(output1[0], {
+ "name": self.NAME1,
+ "domains": dns,
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+ assert env.a2md(["list"]).json['output'][0] == output1[0]
+
+ # test case: remove all domains
+ def test_md_110_001(self, env):
+ assert env.a2md(["update", self.NAME1, "domains"]).exit_code == 1
+
+ # test case: update domains with invalid DNS
+ @pytest.mark.parametrize("invalid_dns", [
+ "tld", "white sp.ace", "invalid.*.wildcard.com", "k\xc3ller.idn.com"
+ ])
+ def test_md_110_002(self, env, invalid_dns):
+ assert env.a2md(["update", self.NAME1, "domains", invalid_dns]).exit_code == 1
+
+ # test case: update domains with overlapping DNS list
+ def test_md_110_003(self, env):
+ dns = [self.NAME1, self.NAME2]
+ assert env.a2md(["update", self.NAME1, "domains"] + dns).exit_code == 1
+
+ # test case: update with subdomains
+ def test_md_110_004(self, env):
+ dns = ["test-foo.com", "sub.test-foo.com"]
+ md = env.a2md(["update", self.NAME1, "domains"] + dns).json['output'][0]
+ assert md['name'] == self.NAME1
+ assert md['domains'] == dns
+
+ # test case: update domains with duplicates
+ def test_md_110_005(self, env):
+ dns = [self.NAME1, self.NAME1, self.NAME1]
+ md = env.a2md(["update", self.NAME1, "domains"] + dns).json['output'][0]
+ assert md['name'] == self.NAME1
+ assert md['domains'] == [self.NAME1]
+
+ # test case: remove domains with punycode
+ def test_md_110_006(self, env):
+ dns = [self.NAME1, "xn--kller-jua.punycode.de"]
+ md = env.a2md(["update", self.NAME1, "domains"] + dns).json['output'][0]
+ assert md['name'] == self.NAME1
+ assert md['domains'] == dns
+
+ # test case: update non-existing managed domain
+ def test_md_110_007(self, env):
+ assert env.a2md(["update", "test-foo.com", "domains", "test-foo.com"]).exit_code == 1
+
+ # test case: update domains with DNS wildcard
+ @pytest.mark.parametrize("wild_dns", [
+ "*.wildcard.com"
+ ])
+ def test_md_110_008(self, env, wild_dns):
+ assert env.a2md(["update", self.NAME1, "domains", wild_dns]).exit_code == 0
+
+ # --------- update ca ---------
+
+ # test case: update CA URL
+ def test_md_110_100(self, env):
+ url = "http://localhost.com:9999"
+ output = env.a2md(["update", self.NAME1, "ca", url]).json['output']
+ assert len(output) == 1
+ env.check_json_contains(output[0], {
+ "name": self.NAME1,
+ "domains": [self.NAME1, "www.greenbytes2.de", "mail.greenbytes2.de"],
+ "contacts": [],
+ "ca": {
+ "url": url,
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+
+ # test case: update CA with invalid URL
+ @pytest.mark.parametrize("invalid_url", [
+ "no.schema/path", "http://white space/path", "http://bad.port:-1/path"
+ ])
+ def test_md_110_101(self, env, invalid_url):
+ assert env.a2md(["update", self.NAME1, "ca", invalid_url]).exit_code == 1
+
+ # test case: update ca protocol
+ def test_md_110_102(self, env):
+ md = env.a2md(["update", self.NAME1, "ca", env.acme_url, "FOO"]).json['output'][0]
+ env.check_json_contains(md['ca'], {
+ "url": env.acme_url,
+ "proto": "FOO"
+ })
+ assert md['state'] == 1
+
+ # test case: update account ID
+ def test_md_110_200(self, env):
+ acc_id = "test.account.id"
+ output = env.a2md(["update", self.NAME1, "account", acc_id]).json['output']
+ assert len(output) == 1
+ env.check_json_contains(output[0], {
+ "name": self.NAME1,
+ "domains": [self.NAME1, "www.greenbytes2.de", "mail.greenbytes2.de"],
+ "contacts": [],
+ "ca": {
+ "account": acc_id,
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+
+ # test case: remove account ID
+ def test_md_110_201(self, env):
+ assert env.a2md(["update", self.NAME1, "account", "test.account.id"]).exit_code == 0
+ md = env.a2md(["update", self.NAME1, "account"]).json['output'][0]
+ env.check_json_contains(md['ca'], {
+ "url": env.acme_url,
+ "proto": "ACME"
+ })
+ assert md['state'] == 1
+
+ # test case: change existing account ID
+ def test_md_110_202(self, env):
+ assert env.a2md(["update", self.NAME1, "account", "test.account.id"]).exit_code == 0
+ md = env.a2md(["update", self.NAME1, "account", "foo.test.com"]).json['output'][0]
+ env.check_json_contains(md['ca'], {
+ "account": "foo.test.com",
+ "url": env.acme_url,
+ "proto": "ACME"
+ })
+ assert md['state'] == 1
+
+ # test case: ignore additional argument
+ def test_md_110_203(self, env):
+ md = env.a2md(["update", self.NAME1, "account", "test.account.id",
+ "test2.account.id"]).json['output'][0]
+ env.check_json_contains(md['ca'], {
+ "account": "test.account.id",
+ "url": env.acme_url,
+ "proto": "ACME"
+ })
+ assert md['state'] == 1
+
+ # test case: add contact info
+ def test_md_110_300(self, env):
+ mail = "test@greenbytes.de"
+ output = env.a2md(["update", self.NAME1, "contacts", mail]).json['output']
+ assert len(output) == 1
+ env.check_json_contains(output[0], {
+ "name": self.NAME1,
+ "domains": [self.NAME1, "www.greenbytes2.de", "mail.greenbytes2.de"],
+ "contacts": ["mailto:" + mail],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+
+ # test case: add multiple contact info, preserve order
+ def test_md_110_301(self, env):
+ mail = ["xxx@greenbytes.de", "aaa@greenbytes.de"]
+ md = env.a2md(["update", self.NAME1, "contacts"] + mail).json['output'][0]
+ assert md['contacts'] == ["mailto:" + mail[0], "mailto:" + mail[1]]
+ assert md['state'] == 1
+
+ # test case: must not remove contact info
+ def test_md_110_302(self, env):
+ assert env.a2md(["update", self.NAME1, "contacts", "test@greenbytes.de"]).exit_code == 0
+ assert env.a2md(["update", self.NAME1, "contacts"]).exit_code == 1
+
+ # test case: replace existing contact info
+ def test_md_110_303(self, env):
+ assert env.a2md(["update", self.NAME1, "contacts", "test@greenbytes.de"]).exit_code == 0
+ md = env.a2md(["update", self.NAME1, "contacts", "xxx@greenbytes.de"]).json['output'][0]
+ assert md['contacts'] == ["mailto:xxx@greenbytes.de"]
+ assert md['state'] == 1
+
+ # test case: use invalid mail address
+ @pytest.mark.parametrize("invalid_mail", [
+ "no.at.char", "with blank@test.com", "missing.host@", "@missing.localpart.de",
+ "double..dot@test.com", "double@at@test.com"
+ ])
+ def test_md_110_304(self, env, invalid_mail):
+ # SEI: Uhm, es ist nicht sinnvoll, eine komplette verification von
+ # https://tools.ietf.org/html/rfc822 zu bauen?
+ assert env.a2md(["update", self.NAME1, "contacts", invalid_mail]).exit_code == 1
+
+ # test case: respect urls as given
+ @pytest.mark.parametrize("url", [
+ "mailto:test@greenbytes.de", "wrong://schema@test.com"])
+ def test_md_110_305(self, env, url):
+ md = env.a2md(["update", self.NAME1, "contacts", url]).json['output'][0]
+ assert md['contacts'] == [url]
+ assert md['state'] == 1
+
+ # test case: add tos agreement
+ def test_md_110_400(self, env):
+ output = env.a2md(["update", self.NAME1, "agreement", env.acme_tos]).json['output']
+ assert len(output) == 1
+ env.check_json_contains(output[0], {
+ "name": self.NAME1,
+ "domains": [self.NAME1, "www.greenbytes2.de", "mail.greenbytes2.de"],
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME",
+ "agreement": env.acme_tos
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+
+ # test case: remove tos agreement
+ def test_md_110_402(self, env):
+ assert env.a2md(["update", self.NAME1, "agreement", env.acme_tos]).exit_code == 0
+ md = env.a2md(["update", self.NAME1, "agreement"]).json['output'][0]
+ env.check_json_contains(md['ca'], {
+ "url": env.acme_url,
+ "proto": "ACME"
+ })
+ assert md['state'] == 1
+
+ # test case: ignore additional arguments
+ def test_md_110_403(self, env):
+ md = env.a2md(["update", self.NAME1, "agreement",
+ env.acme_tos, "http://invalid.tos/"]).json['output'][0]
+ env.check_json_contains(md['ca'], {
+ "url": env.acme_url,
+ "proto": "ACME",
+ "agreement": env.acme_tos
+ })
+ assert md['state'] == 1
+
+ # test case: update agreement with invalid URL
+ @pytest.mark.parametrize("invalid_url", [
+ "no.schema/path", "http://white space/path", "http://bad.port:-1/path"
+ ])
+ def test_md_110_404(self, env, invalid_url):
+ assert env.a2md(["update", self.NAME1, "agreement", invalid_url]).exit_code == 1
Added: httpd/httpd/trunk/test/modules/md/test_120_reg_list.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/test_120_reg_list.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/test_120_reg_list.py (added)
+++ httpd/httpd/trunk/test/modules/md/test_120_reg_list.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,87 @@
+# test mod_md acme terms-of-service handling
+
+from shutil import copyfile
+
+import pytest
+
+from .md_env import MDTestEnv
+
+
+@pytest.mark.skipif(condition=not MDTestEnv.has_a2md(), reason="no a2md available")
+@pytest.mark.skipif(condition=not MDTestEnv.has_acme_server(),
+ reason="no ACME test server configured")
+class TestRegAdd:
+
+ @pytest.fixture(autouse=True, scope='function')
+ def _method_scope(self, env):
+ env.clear_store()
+
+ # test case: list empty store
+ def test_md_120_000(self, env):
+ assert env.a2md(["list"]).json == env.EMPTY_JOUT
+
+ # test case: list two managed domains
+ def test_md_120_001(self, env):
+ domains = [
+ ["test120-001.com", "test120-001a.com", "test120-001b.com"],
+ ["greenbytes2.de", "www.greenbytes2.de", "mail.greenbytes2.de"]
+ ]
+ for dns in domains:
+ assert env.a2md(["add"] + dns).exit_code == 0
+ #
+ # list all store content
+ jout = env.a2md(["list"]).json
+ assert len(jout['output']) == len(domains)
+ domains.reverse()
+ for i in range(0, len(jout['output'])):
+ env.check_json_contains(jout['output'][i], {
+ "name": domains[i][0],
+ "domains": domains[i],
+ "contacts": [],
+ "ca": {
+ "url": env.acme_url,
+ "proto": "ACME"
+ },
+ "state": env.MD_S_INCOMPLETE
+ })
+ # list md by name
+ for dns in ["test120-001.com", "greenbytes2.de"]:
+ md = env.a2md(["list", dns]).json['output'][0]
+ assert md['name'] == dns
+
+ # test case: validate md state in store
+ def test_md_120_002(self, env):
+ # check: md without pkey/cert -> INCOMPLETE
+ domain = f"test1.{env.http_tld}"
+ assert env.a2md(["add", domain]).exit_code == 0
+ assert env.a2md(["update", domain, "contacts", "admin@" + domain]).exit_code == 0
+ assert env.a2md(["update", domain, "agreement", env.acme_tos]).exit_code == 0
+ assert env.a2md(["list", domain]).json['output'][0]['state'] == env.MD_S_INCOMPLETE
+ # check: valid pkey/cert -> COMPLETE
+ cred = env.get_credentials_for_name(domain)[0]
+ copyfile(cred.pkey_file, env.store_domain_file(domain, 'privkey.pem'))
+ copyfile(cred.cert_file, env.store_domain_file(domain, 'pubcert.pem'))
+ assert env.a2md(["list", domain]).json['output'][0]['state'] == env.MD_S_COMPLETE
+ # check: expired cert -> EXPIRED
+ cred = env.get_credentials_for_name(f"expired.{env.http_tld}")[0]
+ copyfile(cred.pkey_file, env.store_domain_file(domain, 'privkey.pem'))
+ copyfile(cred.cert_file, env.store_domain_file(domain, 'pubcert.pem'))
+ out = env.a2md(["list", domain]).json['output'][0]
+ assert out['state'] == env.MD_S_INCOMPLETE
+ assert out['renew'] is True
+
+ # test case: broken cert file
+ def test_md_120_003(self, env):
+ domain = f"test1.{env.http_tld}"
+ assert env.a2md(["add", domain]).exit_code == 0
+ assert env.a2md(["update", domain, "contacts", "admin@" + domain]).exit_code == 0
+ assert env.a2md(["update", domain, "agreement", env.acme_tos]).exit_code == 0
+ # check: valid pkey/cert -> COMPLETE
+ cred = env.get_credentials_for_name(domain)[0]
+ copyfile(cred.pkey_file, env.store_domain_file(domain, 'privkey.pem'))
+ copyfile(cred.cert_file, env.store_domain_file(domain, 'pubcert.pem'))
+ assert env.a2md(["list", domain]).json['output'][0]['state'] == env.MD_S_COMPLETE
+ # check: replace cert by broken file -> ERROR
+ with open(env.store_domain_file(domain, 'pubcert.pem'), 'w') as fd:
+ fd.write("dummy\n")
+ assert env.a2md(["list", domain]).json['output'][0]['state'] == env.MD_S_INCOMPLETE
Added: httpd/httpd/trunk/test/modules/md/test_202_acmev2_regs.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/test_202_acmev2_regs.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/test_202_acmev2_regs.py (added)
+++ httpd/httpd/trunk/test/modules/md/test_202_acmev2_regs.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,132 @@
+# test mod_md ACMEv2 registrations
+
+import re
+import json
+import pytest
+
+from .md_conf import MDConf
+from .md_env import MDTestEnv
+
+
+@pytest.mark.skipif(condition=not MDTestEnv.has_a2md(), reason="no a2md available")
+@pytest.mark.skipif(condition=not MDTestEnv.has_acme_server(),
+ reason="no ACME test server configured")
+class TestAcmeAcc:
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env, acme):
+ acme.start(config='default')
+ env.check_acme()
+ env.APACHE_CONF_SRC = "data/test_drive"
+ MDConf(env).install()
+ assert env.apache_restart() == 0
+
+ @pytest.fixture(autouse=True, scope='function')
+ def _method_scope(self, env):
+ env.check_acme()
+ env.clear_store()
+
+ # test case: register a new account, vary length to check base64 encoding
+ @pytest.mark.parametrize("contact", [
+ "x@not-forbidden.org", "xx@not-forbidden.org", "xxx@not-forbidden.org"
+ ])
+ def test_md_202_000(self, env, contact):
+ r = env.a2md(["-t", "accepted", "acme", "newreg", contact], raw=True)
+ assert r.exit_code == 0, r
+ m = re.match("registered: (.*)$", r.stdout)
+ assert m, "did not match: {0}".format(r.stdout)
+ acct = m.group(1)
+ print("newreg: %s" % m.group(1))
+ self._check_account(env, acct, ["mailto:" + contact])
+
+ # test case: register a new account without accepting ToS, must fail
+ def test_md_202_000b(self, env):
+ r = env.a2md(["acme", "newreg", "x@not-forbidden.org"], raw=True)
+ assert r.exit_code == 1
+ m = re.match(".*must agree to terms of service.*", r.stderr)
+ if m is None:
+ # the pebble variant
+ m = re.match(".*account did not agree to the terms of service.*", r.stderr)
+ assert m, "did not match: {0}".format(r.stderr)
+
+ # test case: respect 'mailto:' prefix in contact url
+ def test_md_202_001(self, env):
+ contact = "mailto:xx@not-forbidden.org"
+ r = env.a2md(["-t", "accepted", "acme", "newreg", contact], raw=True)
+ assert r.exit_code == 0
+ m = re.match("registered: (.*)$", r.stdout)
+ assert m
+ acct = m.group(1)
+ self._check_account(env, acct, [contact])
+
+ # test case: fail on invalid contact url
+ @pytest.mark.parametrize("invalid_contact", [
+ "mehlto:xxx@not-forbidden.org", "no.at.char", "with blank@test.com",
+ "missing.host@", "@missing.localpart.de",
+ "double..dot@test.com", "double@at@test.com"
+ ])
+ def test_md_202_002(self, env, invalid_contact):
+ assert env.a2md(["acme", "newreg", invalid_contact]).exit_code == 1
+
+ # test case: use contact list
+ def test_md_202_003(self, env):
+ contact = ["xx@not-forbidden.org", "aa@not-forbidden.org"]
+ r = env.a2md(["-t", "accepted", "acme", "newreg"] + contact, raw=True)
+ assert r.exit_code == 0
+ m = re.match("registered: (.*)$", r.stdout)
+ assert m
+ acct = m.group(1)
+ self._check_account(env, acct, ["mailto:" + contact[0], "mailto:" + contact[1]])
+
+ # test case: validate new account
+ def test_md_202_100(self, env):
+ acct = self._prepare_account(env, ["tmp@not-forbidden.org"])
+ assert env.a2md(["acme", "validate", acct]).exit_code == 0
+
+ # test case: fail on non-existing account
+ def test_md_202_101(self, env):
+ assert env.a2md(["acme", "validate", "ACME-localhost-1000"]).exit_code == 1
+
+ # test case: report fail on request signing problem
+ def test_md_202_102(self, env):
+ acct = self._prepare_account(env, ["tmp@not-forbidden.org"])
+ with open(env.path_account(acct)) as f:
+ acctj = json.load(f)
+ acctj['url'] = acctj['url'] + "0"
+ open(env.path_account(acct), "w").write(json.dumps(acctj))
+ assert env.a2md(["acme", "validate", acct]).exit_code == 1
+
+ # test case: register and try delete an account, will fail without persistence
+ def test_md_202_200(self, env):
+ acct = self._prepare_account(env, ["tmp@not-forbidden.org"])
+ assert env.a2md(["delreg", acct]).exit_code == 1
+
+ # test case: register and try delete an account with persistence
+ def test_md_202_201(self, env):
+ acct = self._prepare_account(env, ["tmp@not-forbidden.org"])
+ assert env.a2md(["acme", "delreg", acct]).exit_code == 0
+ # check that store is clean
+ r = env.run(["find", env.store_dir])
+ assert re.match(env.store_dir, r.stdout)
+
+ # test case: delete a persisted account without specifying url
+ def test_md_202_202(self, env):
+ acct = self._prepare_account(env, ["tmp@not-forbidden.org"])
+ assert env.run([env.a2md_bin, "-d", env.store_dir, "acme", "delreg", acct]).exit_code == 0
+
+ # test case: delete, then validate an account
+ def test_md_202_203(self, env):
+ acct = self._prepare_account(env, ["test014@not-forbidden.org"])
+ assert env.a2md(["acme", "delreg", acct]).exit_code == 0
+ # validate on deleted account fails
+ assert env.a2md(["acme", "validate", acct]).exit_code == 1
+
+ def _check_account(self, env, acct, contact):
+ with open(env.path_account(acct)) as f:
+ acctj = json.load(f)
+ assert acctj['registration']['contact'] == contact
+
+ def _prepare_account(self, env, contact):
+ r = env.a2md(["-t", "accepted", "acme", "newreg"] + contact, raw=True)
+ assert r.exit_code == 0
+ return re.match("registered: (.*)$", r.stdout).group(1)
Added: httpd/httpd/trunk/test/modules/md/test_300_conf_validate.py
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/test/modules/md/test_300_conf_validate.py?rev=1894611&view=auto
==============================================================================
--- httpd/httpd/trunk/test/modules/md/test_300_conf_validate.py (added)
+++ httpd/httpd/trunk/test/modules/md/test_300_conf_validate.py Fri Oct 29 10:05:29 2021
@@ -0,0 +1,344 @@
+# test mod_md basic configurations
+
+import re
+import time
+from datetime import datetime, timedelta
+
+import pytest
+
+from .md_conf import MDConf
+from .md_env import MDTestEnv
+
+
+@pytest.mark.skipif(condition=not MDTestEnv.has_acme_server(),
+ reason="no ACME test server configured")
+class TestConf:
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env):
+ env.clear_store()
+
+ # test case: just one MDomain definition
+ def test_md_300_001(self, env):
+ MDConf(env, text="""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: two MDomain definitions, non-overlapping
+ def test_md_300_002(self, env):
+ MDConf(env, text="""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org
+ MDomain example2.org www.example2.org mail.example2.org
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: two MDomain definitions, exactly the same
+ def test_md_300_003(self, env):
+ assert env.apache_stop() == 0
+ MDConf(env, text="""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ """).install()
+ assert env.apache_fail() == 0
+
+ # test case: two MDomain definitions, overlapping
+ def test_md_300_004(self, env):
+ assert env.apache_stop() == 0
+ MDConf(env, text="""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ MDomain example2.org test3.not-forbidden.org www.example2.org mail.example2.org
+ """).install()
+ assert env.apache_fail() == 0
+
+ # test case: two MDomains, one inside a virtual host
+ def test_md_300_005(self, env):
+ MDConf(env, text="""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ <VirtualHost *:12346>
+ MDomain example2.org www.example2.org www.example3.org
+ </VirtualHost>
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: two MDomains, one correct vhost name
+ def test_md_300_006(self, env):
+ MDConf(env, text="""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ <VirtualHost *:12346>
+ ServerName example2.org
+ MDomain example2.org www.example2.org www.example3.org
+ </VirtualHost>
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: two MDomains, two correct vhost names
+ def test_md_300_007(self, env):
+ MDConf(env, text="""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ <VirtualHost *:12346>
+ ServerName example2.org
+ MDomain example2.org www.example2.org www.example3.org
+ </VirtualHost>
+ <VirtualHost *:12346>
+ ServerName www.example2.org
+ </VirtualHost>
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: two MDomains, overlapping vhosts
+ def test_md_300_008(self, env):
+ MDConf(env, text="""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ <VirtualHost *:12346>
+ ServerName example2.org
+ ServerAlias www.example3.org
+ MDomain example2.org www.example2.org www.example3.org
+ </VirtualHost>
+
+ <VirtualHost *:12346>
+ ServerName www.example2.org
+ ServerAlias example2.org
+ </VirtualHost>
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: vhosts with overlapping MDs
+ def test_md_300_009(self, env):
+ assert env.apache_stop() == 0
+ conf = MDConf(env)
+ conf.add("""
+ MDMembers manual
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ MDomain example2.org www.example2.org www.example3.org
+ """)
+ conf.add_vhost(port=12346, domains=["example2.org", "www.example3.org"], with_ssl=True)
+ conf.add_vhost(port=12346, domains=["www.example2.org", "example2.org"], with_ssl=True)
+ conf.add_vhost(port=12346, domains=["not-forbidden.org", "example2.org"], with_ssl=True)
+ conf.install()
+ assert env.apache_fail() == 0
+ env.apache_stop()
+ env.httpd_error_log.ignore_recent()
+
+ # test case: MDomain, vhost with matching ServerAlias
+ def test_md_300_010(self, env):
+ conf = MDConf(env)
+ conf.add("""
+ MDomain not-forbidden.org www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+
+ <VirtualHost *:12346>
+ ServerName not-forbidden.org
+ ServerAlias test3.not-forbidden.org
+ </VirtualHost>
+ """)
+ conf.install()
+ assert env.apache_restart() == 0
+
+ # test case: MDomain, misses one ServerAlias
+ def test_md_300_011a(self, env):
+ env.apache_stop()
+ conf = MDConf(env, text="""
+ MDomain not-forbidden.org manual www.not-forbidden.org mail.not-forbidden.org test3.not-forbidden.org
+ """)
+ conf.add_vhost(port=env.https_port, domains=[
+ "not-forbidden.org", "test3.not-forbidden.org", "test4.not-forbidden.org"
+ ])
+ conf.install()
+ assert env.apache_fail() == 0
+ env.apache_stop()
+
+ # test case: MDomain, misses one ServerAlias, but auto add enabled
+ def test_md_300_011b(self, env):
+ env.apache_stop()
+ MDConf(env, text="""
+ MDomain not-forbidden.org auto mail.not-forbidden.org
+
+ <VirtualHost *:%s>
+ ServerName not-forbidden.org
+ ServerAlias test3.not-forbidden.org
+ ServerAlias test4.not-forbidden.org
+ </VirtualHost>
+ """ % env.https_port).install()
+ assert env.apache_restart() == 0
+
+ # test case: MDomain does not match any vhost
+ def test_md_300_012(self, env):
+ MDConf(env, text="""
+ MDomain example012.org www.example012.org
+ <VirtualHost *:12346>
+ ServerName not-forbidden.org
+ ServerAlias test3.not-forbidden.org
+ </VirtualHost>
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: one md covers two vhosts
+ def test_md_300_013(self, env):
+ MDConf(env, text="""
+ MDomain example2.org test-a.example2.org test-b.example2.org
+ <VirtualHost *:12346>
+ ServerName test-a.example2.org
+ </VirtualHost>
+ <VirtualHost *:12346>
+ ServerName test-b.example2.org
+ </VirtualHost>
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: global server name as managed domain name
+ def test_md_300_014(self, env):
+ MDConf(env, text=f"""
+ MDomain www.{env.http_tld} www.example2.org
+
+ <VirtualHost *:12346>
+ ServerName www.example2.org
+ </VirtualHost>
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: valid pkey specification
+ def test_md_300_015(self, env):
+ MDConf(env, text="""
+ MDPrivateKeys Default
+ MDPrivateKeys RSA
+ MDPrivateKeys RSA 2048
+ MDPrivateKeys RSA 3072
+ MDPrivateKeys RSA 4096
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: invalid pkey specification
+ @pytest.mark.parametrize("line,exp_err_msg", [
+ ("MDPrivateKeys", "needs to specify the private key type"),
+ ("MDPrivateKeys Default RSA 1024", "'Default' allows no other parameter"),
+ ("MDPrivateKeys RSA 1024", "must be 2048 or higher"),
+ ("MDPrivateKeys RSA 1024", "must be 2048 or higher"),
+ ("MDPrivateKeys rsa 2048 rsa 4096", "two keys of type 'RSA' are not possible"),
+ ("MDPrivateKeys p-256 secp384r1 P-256", "two keys of type 'P-256' are not possible"),
+ ])
+ def test_md_300_016(self, env, line, exp_err_msg):
+ MDConf(env, text=line).install()
+ assert env.apache_fail() == 0
+ assert exp_err_msg in env.apachectl_stderr
+
+ # test case: invalid renew window directive
+ @pytest.mark.parametrize("line,exp_err_msg", [
+ ("MDRenewWindow dec-31", "has unrecognized format"),
+ ("MDRenewWindow 1y", "has unrecognized format"),
+ ("MDRenewWindow 10 d", "takes one argument"),
+ ("MDRenewWindow 102%", "a length of 100% or more is not allowed.")])
+ def test_md_300_017(self, env, line, exp_err_msg):
+ MDConf(env, text=line).install()
+ assert env.apache_fail() == 0
+ assert exp_err_msg in env.apachectl_stderr
+
+ # test case: invalid uri for MDProxyPass
+ @pytest.mark.parametrize("line,exp_err_msg", [
+ ("MDHttpProxy", "takes one argument"),
+ ("MDHttpProxy localhost:8080", "scheme must be http or https"),
+ ("MDHttpProxy https://127.0.0.1:-443", "invalid port"),
+ ("MDHttpProxy HTTP localhost 8080", "takes one argument")])
+ def test_md_300_018(self, env, line, exp_err_msg):
+ MDConf(env, text=line).install()
+ assert env.apache_fail() == 0, "Server accepted test config {}".format(line)
+ assert exp_err_msg in env.apachectl_stderr
+
+ # test case: invalid parameter for MDRequireHttps
+ @pytest.mark.parametrize("line,exp_err_msg", [
+ ("MDRequireHTTPS yes", "supported parameter values are 'temporary' and 'permanent'"),
+ ("MDRequireHTTPS", "takes one argument")])
+ def test_md_300_019(self, env, line, exp_err_msg):
+ MDConf(env, text=line).install()
+ assert env.apache_fail() == 0, "Server accepted test config {}".format(line)
+ assert exp_err_msg in env.apachectl_stderr
+
+ # test case: invalid parameter for MDMustStaple
+ @pytest.mark.parametrize("line,exp_err_msg", [
+ ("MDMustStaple", "takes one argument"),
+ ("MDMustStaple yes", "supported parameter values are 'on' and 'off'"),
+ ("MDMustStaple true", "supported parameter values are 'on' and 'off'")])
+ def test_md_300_020(self, env, line, exp_err_msg):
+ MDConf(env, text=line).install()
+ assert env.apache_fail() == 0, "Server accepted test config {}".format(line)
+ assert exp_err_msg in env.apachectl_stderr
+ env.httpd_error_log.ignore_recent()
+
+ # test case: alt-names incomplete detection, github isse #68
+ def test_md_300_021(self, env):
+ env.apache_stop()
+ conf = MDConf(env, text="""
+ MDMembers manual
+ MDomain secret.com
+ """)
+ conf.add_vhost(port=12344, domains=[
+ "not.secret.com", "secret.com"
+ ])
+ conf.install()
+ assert env.apache_fail() == 0
+ # this is unreliable on debian
+ #assert env.httpd_error_log.scan_recent(
+ # re.compile(r'.*Virtual Host not.secret.com:0 matches Managed Domain \'secret.com\', '
+ # 'but the name/alias not.secret.com itself is not managed. A requested '
+ # 'MD certificate will not match ServerName.*'), timeout=10
+ #)
+
+ # test case: use MDRequireHttps in an <if> construct, but not in <Directory
+ def test_md_300_022(self, env):
+ MDConf(env, text="""
+ MDomain secret.com
+ <If "1 == 1">
+ MDRequireHttps temporary
+ </If>
+ <VirtualHost *:12344>
+ ServerName secret.com
+ </VirtualHost>
+ """).install()
+ assert env.apache_restart() == 0
+
+ # test case: use MDRequireHttps not in <Directory
+ def test_md_300_023(self, env):
+ conf = MDConf(env, text="""
+ MDomain secret.com
+ <Directory /tmp>
+ MDRequireHttps temporary
+ </Directory>
+ """)
+ conf.add_vhost(port=12344, domains=["secret.com"])
+ conf.install()
+ assert env.apache_fail() == 0
+
+ # test case: invalid parameter for MDCertificateAuthority
+ @pytest.mark.parametrize("ca,exp_err_msg", [
+ ("", "takes one argument"),
+ ("yes", "The CA name 'yes' is not known "),
+ ])
+ def test_md_300_024(self, env, ca, exp_err_msg):
+ conf = MDConf(env, text=f"""
+ MDCertificateAuthority {ca}
+ MDRenewMode manual # lets not contact these in testing
+ """)
+ conf.install()
+ assert env.apache_fail() == 0
+ assert exp_err_msg in env.apachectl_stderr
+
+ # test case: valid parameter for MDCertificateAuthority
+ @pytest.mark.parametrize("ca, url", [
+ ("LetsEncrypt", "https://acme-v02.api.letsencrypt.org/directory"),
+ ("letsencrypt", "https://acme-v02.api.letsencrypt.org/directory"),
+ ("letsencrypt-test", "https://acme-staging-v02.api.letsencrypt.org/directory"),
+ ("LETSEncrypt-TESt", "https://acme-staging-v02.api.letsencrypt.org/directory"),
+ ("buypass", "https://api.buypass.com/acme/directory"),
+ ("buypass-test", "https://api.test4.buypass.no/acme/directory"),
+ ])
+ def test_md_300_025(self, env, ca, url):
+ domain = f"test1.{env.http_tld}"
+ conf = MDConf(env, text=f"""
+ MDCertificateAuthority {ca}
+ MDRenewMode manual
+ """)
+ conf.add_md([domain])
+ conf.install()
+ assert env.apache_restart() == 0, "Server did not accepted CA '{}'".format(ca)
+ md = env.get_md_status(domain)
+ assert md['ca']['url'] == url
+