You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/06/05 23:30:41 UTC
[incubator-iceberg] branch master updated: Adding
InclusiveManifestEvaluator and ResidualEvaluator (#205)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f2e8b78 Adding InclusiveManifestEvaluator and ResidualEvaluator (#205)
f2e8b78 is described below
commit f2e8b78f6bffbe953f05547fed5ee572a658fb17
Author: TGooch44 <te...@gmail.com>
AuthorDate: Wed Jun 5 16:30:37 2019 -0700
Adding InclusiveManifestEvaluator and ResidualEvaluator (#205)
---
python/iceberg/api/expressions/__init__.py | 4 +
python/iceberg/api/expressions/evaluator.py | 15 +-
.../expressions/inclusive_manifest_evaluator.py | 161 ++++++++++++++++++++
.../api/expressions/inclusive_metrics_evaluator.py | 18 ++-
.../iceberg/api/expressions/residual_evaluator.py | 119 +++++++++++++++
.../api/expressions/strict_metrics_evaluator.py | 20 ++-
.../test_inclusive_manifest_evaluator.py | 166 +++++++++++++++++++++
7 files changed, 481 insertions(+), 22 deletions(-)
diff --git a/python/iceberg/api/expressions/__init__.py b/python/iceberg/api/expressions/__init__.py
index fdc5448..4c2189f 100644
--- a/python/iceberg/api/expressions/__init__.py
+++ b/python/iceberg/api/expressions/__init__.py
@@ -37,6 +37,7 @@ __all__ = ["ABOVE_MAX",
"FixedLiteral",
"FloatLiteral",
"inclusive",
+ "InclusiveManifestEvaluator",
"InclusiveMetricsEvaluator",
"InclusiveProjection",
"IntegerLiteral",
@@ -52,6 +53,7 @@ __all__ = ["ABOVE_MAX",
"Or",
"Predicate",
"Reference",
+ "ResidualEvaluator",
"strict",
"StrictMetricsEvaluator",
"StrictProjection",
@@ -73,6 +75,7 @@ from .expression import (And,
TRUE,
TrueExp)
from .expressions import Expressions, ExpressionVisitors
+from .inclusive_manifest_evaluator import InclusiveManifestEvaluator
from .inclusive_metrics_evaluator import InclusiveMetricsEvaluator
from .java_variables import (JAVA_MAX_FLOAT,
JAVA_MAX_INT,
@@ -102,4 +105,5 @@ from .projections import (inclusive,
from .reference import (BoundReference,
NamedReference,
Reference)
+from .residual_evaluator import ResidualEvaluator
from .strict_metrics_evaluator import StrictMetricsEvaluator
diff --git a/python/iceberg/api/expressions/evaluator.py b/python/iceberg/api/expressions/evaluator.py
index 340f352..3b0bbc6 100644
--- a/python/iceberg/api/expressions/evaluator.py
+++ b/python/iceberg/api/expressions/evaluator.py
@@ -15,22 +15,25 @@
# specific language governing permissions and limitations
# under the License.
+import threading
+
from .binder import Binder
from .expressions import ExpressionVisitors
class Evaluator(object):
+ THREAD_LOCAL_DATA = threading.local()
+
+ def visitor(self):
+ if not hasattr(Evaluator.THREAD_LOCAL_DATA, "visitors") :
+ Evaluator.THREAD_LOCAL_DATA.visitors = Evaluator.EvalVisitor()
+
+ return Evaluator.THREAD_LOCAL_DATA.visitors
def __init__(self, struct, unbound, case_sensitive=True):
self.expr = Binder.bind(struct, unbound, case_sensitive)
self.visitors = None
- def visitor(self):
- if self.visitors is None:
- self.visitors = Evaluator.EvalVisitor()
-
- return self.visitors
-
def eval(self, data):
return self.visitor().eval(data, self.expr)
diff --git a/python/iceberg/api/expressions/inclusive_manifest_evaluator.py b/python/iceberg/api/expressions/inclusive_manifest_evaluator.py
new file mode 100644
index 0000000..e1c55fb
--- /dev/null
+++ b/python/iceberg/api/expressions/inclusive_manifest_evaluator.py
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+
+from .binder import Binder
+from .expressions import Expressions, ExpressionVisitors
+from .projections import inclusive
+from ..types import Conversions
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+
+
+class InclusiveManifestEvaluator(object):
+
+ def visitor(self):
+ if not hasattr(self.thread_local_data, "visitors"):
+ self.thread_local_data.visitors = ManifestEvalVistor(self.expr)
+
+ return self.thread_local_data.visitors
+
+ def __init__(self, spec, row_filter, case_sensitive=True):
+ self.struct = spec.partition_type()
+ self.expr = Binder.bind(self.struct,
+ Expressions.rewrite_not(inclusive(spec, case_sensitive=case_sensitive)
+ .project(row_filter)),
+ case_sensitive=case_sensitive)
+ self.thread_local_data = threading.local()
+
+ def eval(self, manifest):
+ return self.visitor().eval(manifest)
+
+
+class ManifestEvalVistor(ExpressionVisitors.BoundExpressionVisitor):
+
+ def __init__(self, expr):
+ self.expr = expr
+ self.stats = None
+
+ def eval(self, manifest):
+ self.stats = manifest.partitions
+ if self.stats is None:
+ return ROWS_MIGHT_MATCH
+
+ return ExpressionVisitors.visit(self.expr, self)
+
+ def always_true(self):
+ return ROWS_MIGHT_MATCH
+
+ def always_false(self):
+ return ROWS_CANNOT_MATCH
+
+ def not_(self, result):
+ return not result
+
+ def and_(self, left_result, right_result):
+ return left_result and right_result
+
+ def or_(self, left_result, right_result):
+ return left_result or right_result
+
+ def is_null(self, ref):
+ if not self.stats[ref.pos].contains_null():
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def not_null(self, ref):
+ lower_bound = self.stats[ref.pos].lower_bound()
+ if lower_bound is None:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def lt(self, ref, lit):
+ lower_bound = self.stats[ref.pos].lower_bound()
+ if lower_bound is None:
+ return ROWS_CANNOT_MATCH
+
+ lower = Conversions.from_byte_buffer(ref.type, lower_bound)
+
+ if lower >= lit.value:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def lt_eq(self, ref, lit):
+ lower_bound = self.stats[ref.pos].lower_bound()
+ if lower_bound is None:
+ return ROWS_CANNOT_MATCH
+
+ lower = Conversions.from_byte_buffer(ref.type, lower_bound)
+
+ if lower > lit.value:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def gt(self, ref, lit):
+ upper_bound = self.stats[ref.pos].upper_bound()
+ if upper_bound is None:
+ return ROWS_CANNOT_MATCH
+
+ upper = Conversions.from_byte_buffer(ref.type, upper_bound)
+
+ if upper <= lit.value:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def gt_eq(self, ref, lit):
+ upper_bound = self.stats[ref.pos].upper_bound()
+ if upper_bound is None:
+ return ROWS_CANNOT_MATCH
+
+ upper = Conversions.from_byte_buffer(ref.type, upper_bound)
+
+ if upper < lit.value:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def eq(self, ref, lit):
+ field_stats = self.stats[ref.pos]
+ if field_stats.lower_bound() is None:
+ return ROWS_CANNOT_MATCH
+
+ lower = Conversions.from_byte_buffer(ref.type, field_stats.lower_bound())
+ if lower > lit.value:
+ return ROWS_CANNOT_MATCH
+
+ upper = Conversions.from_byte_buffer(ref.type, field_stats.upper_bound())
+
+ if upper < lit.value:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def not_eq(self, ref, lit):
+ return ROWS_MIGHT_MATCH
+
+ def in_(self, ref, lit):
+ return ROWS_MIGHT_MATCH
+
+ def not_in(self, ref, lit):
+ return ROWS_MIGHT_MATCH
diff --git a/python/iceberg/api/expressions/inclusive_metrics_evaluator.py b/python/iceberg/api/expressions/inclusive_metrics_evaluator.py
index 3134831..ac54935 100644
--- a/python/iceberg/api/expressions/inclusive_metrics_evaluator.py
+++ b/python/iceberg/api/expressions/inclusive_metrics_evaluator.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+import threading
+
from .expressions import Expressions, ExpressionVisitors
from ..expressions.binder import Binder
from ..types import Conversions
@@ -22,21 +24,21 @@ from ..types import Conversions
class InclusiveMetricsEvaluator(object):
+ def visitor(self):
+ if not hasattr(self.thread_local_data, "visitors"):
+ self.thread_local_data.visitors = MetricsEvalVisitor(self.expr, self.schema, self.struct)
+
+ return self.thread_local_data.visitors
+
def __init__(self, schema, unbound, case_sensitive=True):
self.schema = schema
self.struct = schema.as_struct()
self.case_sensitive = case_sensitive
self.expr = Binder.bind(self.struct, Expressions.rewrite_not(unbound), case_sensitive)
- self._visitors = None
-
- def _visitor(self):
- if self._visitors is None:
- self._visitors = MetricsEvalVisitor(self.expr, self.schema, self.struct)
-
- return self._visitors
+ self.thread_local_data = threading.local()
def eval(self, file):
- return self._visitor().eval(file)
+ return self.visitor().eval(file)
class MetricsEvalVisitor(ExpressionVisitors.BoundExpressionVisitor):
diff --git a/python/iceberg/api/expressions/residual_evaluator.py b/python/iceberg/api/expressions/residual_evaluator.py
new file mode 100644
index 0000000..917c49d
--- /dev/null
+++ b/python/iceberg/api/expressions/residual_evaluator.py
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+
+from .expressions import Expressions, ExpressionVisitors
+from .predicate import BoundPredicate, Predicate, UnboundPredicate
+
+
+class ResidualEvaluator(object):
+
+ def visitor(self):
+ if not hasattr(self.thread_local_data, "visitors"):
+ self.thread_local_data.visitors = ResidualVisitor()
+
+ return self.thread_local_data.visitors
+
+ def __init__(self, spec, expr):
+ self._spec = spec
+ self._expr = expr
+ self.thread_local_data = threading.local()
+
+ def residual_for(self, partition_data):
+ return self.visitor().eval(partition_data)
+
+
+class ResidualVisitor(ExpressionVisitors.BoundExpressionVisitor):
+
+ def __init__(self):
+ self.struct = None
+
+ def eval(self, struct):
+ self.struct = struct
+
+ def always_true(self):
+ return Expressions.always_true()
+
+ def always_false(self):
+ return Expressions.always_false()
+
+ def is_null(self, ref):
+ return self.always_true() if ref.get(self.struct) is None else self.always_false()
+
+ def not_null(self, ref):
+ return self.always_true() if ref.get(self.struct) is not None else self.always_false()
+
+ def lt(self, ref, lit):
+ return self.always_true() if ref.get(self.struct) < lit.value else self.always_false()
+
+ def lt_eq(self, ref, lit):
+ return self.always_true() if ref.get(self.struct) <= lit.value else self.always_false()
+
+ def gt(self, ref, lit):
+ return self.always_true() if ref.get(self.struct) > lit.value else self.always_false()
+
+ def gt_eq(self, ref, lit):
+ return self.always_true() if ref.get(self.struct) >= lit.value else self.always_false()
+
+ def eq(self, ref, lit):
+ return self.always_true() if ref.get(self.struct) == lit.value else self.always_false()
+
+ def not_eq(self, ref, lit):
+ return self.always_true() if ref.get(self.struct) != lit.value else self.always_false()
+
+ def not_(self, result):
+ return Expressions.not_(result)
+
+ def and_(self, left_result, right_result):
+ return Expressions.and_(left_result, right_result)
+
+ def or_(self, left_result, right_result):
+ return Expressions.or_(left_result, right_result)
+
+ def predicate(self, pred):
+ if isinstance(pred, BoundPredicate):
+ return self.bound_predicate(pred)
+ elif isinstance(pred, UnboundPredicate):
+ return self.unbound_predicate(pred)
+
+ raise RuntimeError("Invalid predicate argument %s" % pred)
+
+ def bound_predicate(self, pred):
+ part = self.spec.get_field_by_source_id(pred.ref.field_id)
+ if part is None:
+ return pred
+
+ strict_projection = part.transform.project_strict(part.name, pred)
+ if strict_projection is None:
+ bound = strict_projection.bind(self.spec.partition_type())
+ if isinstance(bound, BoundPredicate):
+ return super(ResidualVisitor, self).predicate(bound)
+ return bound
+
+ return pred
+
+ def unbound_predicate(self, pred):
+ bound = pred.bind(self.spec.schema.as_struct())
+
+ if isinstance(bound, BoundPredicate):
+ bound_residual = self.predicate(bound)
+ if isinstance(bound_residual, Predicate):
+ return pred
+ return bound_residual
+
+ return bound
diff --git a/python/iceberg/api/expressions/strict_metrics_evaluator.py b/python/iceberg/api/expressions/strict_metrics_evaluator.py
index 58b7230..c8c8a78 100644
--- a/python/iceberg/api/expressions/strict_metrics_evaluator.py
+++ b/python/iceberg/api/expressions/strict_metrics_evaluator.py
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+import threading
+
from .expressions import Expressions, ExpressionVisitors
from ..expressions.binder import Binder
from ..types import Conversions
@@ -22,20 +24,22 @@ from ..types import Conversions
class StrictMetricsEvaluator(object):
+ def visitor(self):
+ if not hasattr(self.thread_local_data, "visitors"):
+ self.thread_local_data.visitors = StrictMetricsEvaluator.MetricsEvalVisitor(self.expr,
+ self.schema,
+ self.struct)
+
+ return self.thread_local_data.visitors
+
def __init__(self, schema, unbound):
self.schema = schema
self.struct = schema.as_struct()
self.expr = Binder.bind(self.struct, Expressions.rewrite_not(unbound))
- self._visitors = None
-
- def _visitor(self):
- if self._visitors is None:
- self._visitors = StrictMetricsEvaluator.MetricsEvalVisitor(self.expr, self.schema, self.struct)
-
- return self._visitors
+ self.thread_local_data = threading.local()
def eval(self, file):
- return self._visitor().eval(file)
+ return self.visitor().eval(file)
class MetricsEvalVisitor(ExpressionVisitors.BoundExpressionVisitor):
ROWS_MUST_MATCH = True
diff --git a/python/tests/api/expressions/test_inclusive_manifest_evaluator.py b/python/tests/api/expressions/test_inclusive_manifest_evaluator.py
new file mode 100644
index 0000000..165ed2a
--- /dev/null
+++ b/python/tests/api/expressions/test_inclusive_manifest_evaluator.py
@@ -0,0 +1,166 @@
+
+from iceberg.api.expressions import Expressions, InclusiveManifestEvaluator
+from iceberg.exceptions import ValidationException
+import pytest
+
+
+@pytest.mark.parametrize("expression,expected", [
+ (Expressions.not_null("all_nulls"), False),
+ (Expressions.not_null("some_nulls"), True),
+ (Expressions.not_null("no_nulls"), True)])
+def test_all_nulls(inc_man_spec, inc_man_file, expression, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec, expression).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("expression,expected", [
+ (Expressions.is_null("all_nulls"), True),
+ (Expressions.is_null("some_nulls"), True),
+ (Expressions.is_null("no_nulls"), False)])
+def test_no_nulls(inc_man_spec, inc_man_file, expression, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec, expression).eval(inc_man_file) == expected
+
+
+def test_missing_column(inc_man_spec, inc_man_file):
+ with pytest.raises(ValidationException):
+ InclusiveManifestEvaluator(inc_man_spec, Expressions.less_than("missing", 5)).eval(inc_man_file)
+
+
+@pytest.mark.parametrize("expression", [
+ Expressions.less_than("id", 5),
+ Expressions.less_than_or_equal("id", 30),
+ Expressions.equal("id", 70),
+ Expressions.greater_than("id", 78),
+ Expressions.greater_than_or_equal("id", 90),
+ Expressions.not_equal("id", 101),
+ Expressions.less_than_or_equal("id", 30),
+ Expressions.is_null("id"),
+ Expressions.not_null("id")])
+def test_missing_stats(inc_man_spec, inc_man_file_ns, expression):
+ assert InclusiveManifestEvaluator(inc_man_spec, expression).eval(inc_man_file_ns)
+
+
+@pytest.mark.parametrize("expression, expected", [
+ (Expressions.less_than("id", 5), True),
+ (Expressions.greater_than("id", 5), False)])
+def test_not(inc_man_spec, inc_man_file, expression, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec, Expressions.not_(expression)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("expr1, expr2, expected", [
+ (Expressions.less_than("id", 5), Expressions.greater_than_or_equal("id", 0), False),
+ (Expressions.greater_than("id", 5), Expressions.less_than_or_equal("id", 30), True)])
+def test_and(inc_man_spec, inc_man_file, expr1, expr2, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec, Expressions.and_(expr1, expr2)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("expr1, expr2, expected", [
+ (Expressions.less_than("id", 5), Expressions.greater_than_or_equal("id", 80), False),
+ (Expressions.less_than("id", 5), Expressions.greater_than_or_equal("id", 60), True)])
+def test_or(inc_man_spec, inc_man_file, expr1, expr2, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec, Expressions.or_(expr1, expr2)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (5, False),
+ (30, False),
+ (31, True),
+ (79, True)])
+def test_int_lt(inc_man_spec, inc_man_file, val, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec, Expressions.less_than("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (5, False),
+ (29, False),
+ (30, True),
+ (79, True)])
+def test_int_lt_eq(inc_man_spec, inc_man_file, val, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec,
+ Expressions.less_than_or_equal("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (85, False),
+ (79, False),
+ (78, True),
+ (75, True)])
+def test_int_gt(inc_man_spec, inc_man_file, val, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec, Expressions.greater_than("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (85, False),
+ (80, False),
+ (79, True),
+ (75, True)])
+def test_int_gt_eq(inc_man_spec, inc_man_file, val, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec,
+ Expressions.greater_than_or_equal("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (5, False),
+ (29, False),
+ (30, True),
+ (75, True),
+ (79, True),
+ (80, False),
+ (85, False)])
+def test_int_eq(inc_man_spec, inc_man_file, val, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec,
+ Expressions.equal("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (5, True),
+ (29, True),
+ (30, True),
+ (75, True),
+ (79, True),
+ (80, True),
+ (85, True)])
+def test_int_not_eq(inc_man_spec, inc_man_file, val, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec,
+ Expressions.not_equal("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (5, True),
+ (29, True),
+ (30, True),
+ (75, True),
+ (79, True),
+ (80, True),
+ (85, True)])
+def test_int_not_eq_rewritten(inc_man_spec, inc_man_file, val, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec,
+ Expressions.not_(Expressions.equal("id", val))).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (5, True),
+ (29, True),
+ (30, True),
+ (75, True),
+ (79, True),
+ (80, True),
+ (85, True)])
+def test_case_insensitive_int_not_eq_rewritten(inc_man_spec, inc_man_file, val, expected):
+ assert InclusiveManifestEvaluator(inc_man_spec,
+ Expressions.not_(Expressions.equal("ID", val)),
+ case_sensitive=False).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+ (5, True),
+ (29, True),
+ (30, True),
+ (75, True),
+ (79, True),
+ (80, True),
+ (85, True)])
+def test_case_sensitive_int_not_eq_rewritten(inc_man_spec, inc_man_file, val, expected):
+ with pytest.raises(ValidationException):
+ InclusiveManifestEvaluator(inc_man_spec,
+ Expressions.not_(Expressions.equal("ID", val)),
+ case_sensitive=True).eval(inc_man_file) == expected