You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/06/05 23:30:41 UTC

[incubator-iceberg] branch master updated: Adding InclusiveManifestEvaluator and ResidualEvaluator (#205)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f2e8b78  Adding InclusiveManifestEvaluator and ResidualEvaluator (#205)
f2e8b78 is described below

commit f2e8b78f6bffbe953f05547fed5ee572a658fb17
Author: TGooch44 <te...@gmail.com>
AuthorDate: Wed Jun 5 16:30:37 2019 -0700

    Adding InclusiveManifestEvaluator and ResidualEvaluator (#205)
---
 python/iceberg/api/expressions/__init__.py         |   4 +
 python/iceberg/api/expressions/evaluator.py        |  15 +-
 .../expressions/inclusive_manifest_evaluator.py    | 161 ++++++++++++++++++++
 .../api/expressions/inclusive_metrics_evaluator.py |  18 ++-
 .../iceberg/api/expressions/residual_evaluator.py  | 119 +++++++++++++++
 .../api/expressions/strict_metrics_evaluator.py    |  20 ++-
 .../test_inclusive_manifest_evaluator.py           | 166 +++++++++++++++++++++
 7 files changed, 481 insertions(+), 22 deletions(-)

diff --git a/python/iceberg/api/expressions/__init__.py b/python/iceberg/api/expressions/__init__.py
index fdc5448..4c2189f 100644
--- a/python/iceberg/api/expressions/__init__.py
+++ b/python/iceberg/api/expressions/__init__.py
@@ -37,6 +37,7 @@ __all__ = ["ABOVE_MAX",
            "FixedLiteral",
            "FloatLiteral",
            "inclusive",
+           "InclusiveManifestEvaluator",
            "InclusiveMetricsEvaluator",
            "InclusiveProjection",
            "IntegerLiteral",
@@ -52,6 +53,7 @@ __all__ = ["ABOVE_MAX",
            "Or",
            "Predicate",
            "Reference",
+           "ResidualEvaluator",
            "strict",
            "StrictMetricsEvaluator",
            "StrictProjection",
@@ -73,6 +75,7 @@ from .expression import (And,
                          TRUE,
                          TrueExp)
 from .expressions import Expressions, ExpressionVisitors
+from .inclusive_manifest_evaluator import InclusiveManifestEvaluator
 from .inclusive_metrics_evaluator import InclusiveMetricsEvaluator
 from .java_variables import (JAVA_MAX_FLOAT,
                              JAVA_MAX_INT,
@@ -102,4 +105,5 @@ from .projections import (inclusive,
 from .reference import (BoundReference,
                         NamedReference,
                         Reference)
+from .residual_evaluator import ResidualEvaluator
 from .strict_metrics_evaluator import StrictMetricsEvaluator
diff --git a/python/iceberg/api/expressions/evaluator.py b/python/iceberg/api/expressions/evaluator.py
index 340f352..3b0bbc6 100644
--- a/python/iceberg/api/expressions/evaluator.py
+++ b/python/iceberg/api/expressions/evaluator.py
@@ -15,22 +15,25 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import threading
+
 from .binder import Binder
 from .expressions import ExpressionVisitors
 
 
 class Evaluator(object):
+    THREAD_LOCAL_DATA = threading.local()
+
+    def visitor(self):
+        if not hasattr(Evaluator.THREAD_LOCAL_DATA, "visitors") :
+            Evaluator.THREAD_LOCAL_DATA.visitors = Evaluator.EvalVisitor()
+
+        return Evaluator.THREAD_LOCAL_DATA.visitors
 
     def __init__(self, struct, unbound, case_sensitive=True):
         self.expr = Binder.bind(struct, unbound, case_sensitive)
         self.visitors = None
 
-    def visitor(self):
-        if self.visitors is None:
-            self.visitors = Evaluator.EvalVisitor()
-
-        return self.visitors
-
     def eval(self, data):
         return self.visitor().eval(data, self.expr)
 
diff --git a/python/iceberg/api/expressions/inclusive_manifest_evaluator.py b/python/iceberg/api/expressions/inclusive_manifest_evaluator.py
new file mode 100644
index 0000000..e1c55fb
--- /dev/null
+++ b/python/iceberg/api/expressions/inclusive_manifest_evaluator.py
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+
+from .binder import Binder
+from .expressions import Expressions, ExpressionVisitors
+from .projections import inclusive
+from ..types import Conversions
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+
+
+class InclusiveManifestEvaluator(object):
+
+    def visitor(self):
+        if not hasattr(self.thread_local_data, "visitors"):
+            self.thread_local_data.visitors = ManifestEvalVistor(self.expr)
+
+        return self.thread_local_data.visitors
+
+    def __init__(self, spec, row_filter, case_sensitive=True):
+        self.struct = spec.partition_type()
+        self.expr = Binder.bind(self.struct,
+                                Expressions.rewrite_not(inclusive(spec, case_sensitive=case_sensitive)
+                                                        .project(row_filter)),
+                                case_sensitive=case_sensitive)
+        self.thread_local_data = threading.local()
+
+    def eval(self, manifest):
+        return self.visitor().eval(manifest)
+
+
+class ManifestEvalVistor(ExpressionVisitors.BoundExpressionVisitor):
+
+    def __init__(self, expr):
+        self.expr = expr
+        self.stats = None
+
+    def eval(self, manifest):
+        self.stats = manifest.partitions
+        if self.stats is None:
+            return ROWS_MIGHT_MATCH
+
+        return ExpressionVisitors.visit(self.expr, self)
+
+    def always_true(self):
+        return ROWS_MIGHT_MATCH
+
+    def always_false(self):
+        return ROWS_CANNOT_MATCH
+
+    def not_(self, result):
+        return not result
+
+    def and_(self, left_result, right_result):
+        return left_result and right_result
+
+    def or_(self, left_result, right_result):
+        return left_result or right_result
+
+    def is_null(self, ref):
+        if not self.stats[ref.pos].contains_null():
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def not_null(self, ref):
+        lower_bound = self.stats[ref.pos].lower_bound()
+        if lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def lt(self, ref, lit):
+        lower_bound = self.stats[ref.pos].lower_bound()
+        if lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = Conversions.from_byte_buffer(ref.type, lower_bound)
+
+        if lower >= lit.value:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def lt_eq(self, ref, lit):
+        lower_bound = self.stats[ref.pos].lower_bound()
+        if lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = Conversions.from_byte_buffer(ref.type, lower_bound)
+
+        if lower > lit.value:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def gt(self, ref, lit):
+        upper_bound = self.stats[ref.pos].upper_bound()
+        if upper_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        upper = Conversions.from_byte_buffer(ref.type, upper_bound)
+
+        if upper <= lit.value:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def gt_eq(self, ref, lit):
+        upper_bound = self.stats[ref.pos].upper_bound()
+        if upper_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        upper = Conversions.from_byte_buffer(ref.type, upper_bound)
+
+        if upper < lit.value:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def eq(self, ref, lit):
+        field_stats = self.stats[ref.pos]
+        if field_stats.lower_bound() is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = Conversions.from_byte_buffer(ref.type, field_stats.lower_bound())
+        if lower > lit.value:
+            return ROWS_CANNOT_MATCH
+
+        upper = Conversions.from_byte_buffer(ref.type, field_stats.upper_bound())
+
+        if upper < lit.value:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def not_eq(self, ref, lit):
+        return ROWS_MIGHT_MATCH
+
+    def in_(self, ref, lit):
+        return ROWS_MIGHT_MATCH
+
+    def not_in(self, ref, lit):
+        return ROWS_MIGHT_MATCH
diff --git a/python/iceberg/api/expressions/inclusive_metrics_evaluator.py b/python/iceberg/api/expressions/inclusive_metrics_evaluator.py
index 3134831..ac54935 100644
--- a/python/iceberg/api/expressions/inclusive_metrics_evaluator.py
+++ b/python/iceberg/api/expressions/inclusive_metrics_evaluator.py
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import threading
+
 from .expressions import Expressions, ExpressionVisitors
 from ..expressions.binder import Binder
 from ..types import Conversions
@@ -22,21 +24,21 @@ from ..types import Conversions
 
 class InclusiveMetricsEvaluator(object):
 
+    def visitor(self):
+        if not hasattr(self.thread_local_data, "visitors"):
+            self.thread_local_data.visitors = MetricsEvalVisitor(self.expr, self.schema, self.struct)
+
+        return self.thread_local_data.visitors
+
     def __init__(self, schema, unbound, case_sensitive=True):
         self.schema = schema
         self.struct = schema.as_struct()
         self.case_sensitive = case_sensitive
         self.expr = Binder.bind(self.struct, Expressions.rewrite_not(unbound), case_sensitive)
-        self._visitors = None
-
-    def _visitor(self):
-        if self._visitors is None:
-            self._visitors = MetricsEvalVisitor(self.expr, self.schema, self.struct)
-
-        return self._visitors
+        self.thread_local_data = threading.local()
 
     def eval(self, file):
-        return self._visitor().eval(file)
+        return self.visitor().eval(file)
 
 
 class MetricsEvalVisitor(ExpressionVisitors.BoundExpressionVisitor):
diff --git a/python/iceberg/api/expressions/residual_evaluator.py b/python/iceberg/api/expressions/residual_evaluator.py
new file mode 100644
index 0000000..917c49d
--- /dev/null
+++ b/python/iceberg/api/expressions/residual_evaluator.py
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+
+from .expressions import Expressions, ExpressionVisitors
+from .predicate import BoundPredicate, Predicate, UnboundPredicate
+
+
+class ResidualEvaluator(object):
+
+    def visitor(self):
+        if not hasattr(self.thread_local_data, "visitors"):
+            self.thread_local_data.visitors = ResidualVisitor()
+
+        return self.thread_local_data.visitors
+
+    def __init__(self, spec, expr):
+        self._spec = spec
+        self._expr = expr
+        self.thread_local_data = threading.local()
+
+    def residual_for(self, partition_data):
+        return self.visitor().eval(partition_data)
+
+
+class ResidualVisitor(ExpressionVisitors.BoundExpressionVisitor):
+
+    def __init__(self):
+        self.struct = None
+
+    def eval(self, struct):
+        self.struct = struct
+
+    def always_true(self):
+        return Expressions.always_true()
+
+    def always_false(self):
+        return Expressions.always_false()
+
+    def is_null(self, ref):
+        return self.always_true() if ref.get(self.struct) is None else self.always_false()
+
+    def not_null(self, ref):
+        return self.always_true() if ref.get(self.struct) is not None else self.always_false()
+
+    def lt(self, ref, lit):
+        return self.always_true() if ref.get(self.struct) < lit.value else self.always_false()
+
+    def lt_eq(self, ref, lit):
+        return self.always_true() if ref.get(self.struct) <= lit.value else self.always_false()
+
+    def gt(self, ref, lit):
+        return self.always_true() if ref.get(self.struct) > lit.value else self.always_false()
+
+    def gt_eq(self, ref, lit):
+        return self.always_true() if ref.get(self.struct) >= lit.value else self.always_false()
+
+    def eq(self, ref, lit):
+        return self.always_true() if ref.get(self.struct) == lit.value else self.always_false()
+
+    def not_eq(self, ref, lit):
+        return self.always_true() if ref.get(self.struct) != lit.value else self.always_false()
+
+    def not_(self, result):
+        return Expressions.not_(result)
+
+    def and_(self, left_result, right_result):
+        return Expressions.and_(left_result, right_result)
+
+    def or_(self, left_result, right_result):
+        return Expressions.or_(left_result, right_result)
+
+    def predicate(self, pred):
+        if isinstance(pred, BoundPredicate):
+            return self.bound_predicate(pred)
+        elif isinstance(pred, UnboundPredicate):
+            return self.unbound_predicate(pred)
+
+        raise RuntimeError("Invalid predicate argument %s" % pred)
+
+    def bound_predicate(self, pred):
+        part = self.spec.get_field_by_source_id(pred.ref.field_id)
+        if part is None:
+            return pred
+
+        strict_projection = part.transform.project_strict(part.name, pred)
+        if strict_projection is None:
+            bound = strict_projection.bind(self.spec.partition_type())
+            if isinstance(bound, BoundPredicate):
+                return super(ResidualVisitor, self).predicate(bound)
+            return bound
+
+        return pred
+
+    def unbound_predicate(self, pred):
+        bound = pred.bind(self.spec.schema.as_struct())
+
+        if isinstance(bound, BoundPredicate):
+            bound_residual = self.predicate(bound)
+            if isinstance(bound_residual, Predicate):
+                return pred
+            return bound_residual
+
+        return bound
diff --git a/python/iceberg/api/expressions/strict_metrics_evaluator.py b/python/iceberg/api/expressions/strict_metrics_evaluator.py
index 58b7230..c8c8a78 100644
--- a/python/iceberg/api/expressions/strict_metrics_evaluator.py
+++ b/python/iceberg/api/expressions/strict_metrics_evaluator.py
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import threading
+
 from .expressions import Expressions, ExpressionVisitors
 from ..expressions.binder import Binder
 from ..types import Conversions
@@ -22,20 +24,22 @@ from ..types import Conversions
 
 class StrictMetricsEvaluator(object):
 
+    def visitor(self):
+        if not hasattr(self.thread_local_data, "visitors"):
+            self.thread_local_data.visitors = StrictMetricsEvaluator.MetricsEvalVisitor(self.expr,
+                                                                                                          self.schema,
+                                                                                                          self.struct)
+
+        return self.thread_local_data.visitors
+
     def __init__(self, schema, unbound):
         self.schema = schema
         self.struct = schema.as_struct()
         self.expr = Binder.bind(self.struct, Expressions.rewrite_not(unbound))
-        self._visitors = None
-
-    def _visitor(self):
-        if self._visitors is None:
-            self._visitors = StrictMetricsEvaluator.MetricsEvalVisitor(self.expr, self.schema, self.struct)
-
-        return self._visitors
+        self.thread_local_data = threading.local()
 
     def eval(self, file):
-        return self._visitor().eval(file)
+        return self.visitor().eval(file)
 
     class MetricsEvalVisitor(ExpressionVisitors.BoundExpressionVisitor):
         ROWS_MUST_MATCH = True
diff --git a/python/tests/api/expressions/test_inclusive_manifest_evaluator.py b/python/tests/api/expressions/test_inclusive_manifest_evaluator.py
new file mode 100644
index 0000000..165ed2a
--- /dev/null
+++ b/python/tests/api/expressions/test_inclusive_manifest_evaluator.py
@@ -0,0 +1,166 @@
+
+from iceberg.api.expressions import Expressions, InclusiveManifestEvaluator
+from iceberg.exceptions import ValidationException
+import pytest
+
+
+@pytest.mark.parametrize("expression,expected", [
+    (Expressions.not_null("all_nulls"), False),
+    (Expressions.not_null("some_nulls"), True),
+    (Expressions.not_null("no_nulls"), True)])
+def test_all_nulls(inc_man_spec, inc_man_file, expression, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec, expression).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("expression,expected", [
+    (Expressions.is_null("all_nulls"), True),
+    (Expressions.is_null("some_nulls"), True),
+    (Expressions.is_null("no_nulls"), False)])
+def test_no_nulls(inc_man_spec, inc_man_file, expression, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec, expression).eval(inc_man_file) == expected
+
+
+def test_missing_column(inc_man_spec, inc_man_file):
+    with pytest.raises(ValidationException):
+        InclusiveManifestEvaluator(inc_man_spec, Expressions.less_than("missing", 5)).eval(inc_man_file)
+
+
+@pytest.mark.parametrize("expression", [
+    Expressions.less_than("id", 5),
+    Expressions.less_than_or_equal("id", 30),
+    Expressions.equal("id", 70),
+    Expressions.greater_than("id", 78),
+    Expressions.greater_than_or_equal("id", 90),
+    Expressions.not_equal("id", 101),
+    Expressions.less_than_or_equal("id", 30),
+    Expressions.is_null("id"),
+    Expressions.not_null("id")])
+def test_missing_stats(inc_man_spec, inc_man_file_ns, expression):
+    assert InclusiveManifestEvaluator(inc_man_spec, expression).eval(inc_man_file_ns)
+
+
+@pytest.mark.parametrize("expression, expected", [
+    (Expressions.less_than("id", 5), True),
+    (Expressions.greater_than("id", 5), False)])
+def test_not(inc_man_spec, inc_man_file, expression, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec, Expressions.not_(expression)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("expr1, expr2, expected", [
+    (Expressions.less_than("id", 5), Expressions.greater_than_or_equal("id", 0), False),
+    (Expressions.greater_than("id", 5), Expressions.less_than_or_equal("id", 30), True)])
+def test_and(inc_man_spec, inc_man_file, expr1, expr2, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec, Expressions.and_(expr1, expr2)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("expr1, expr2, expected", [
+    (Expressions.less_than("id", 5), Expressions.greater_than_or_equal("id", 80), False),
+    (Expressions.less_than("id", 5), Expressions.greater_than_or_equal("id", 60), True)])
+def test_or(inc_man_spec, inc_man_file, expr1, expr2, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec, Expressions.or_(expr1, expr2)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (5, False),
+    (30, False),
+    (31, True),
+    (79, True)])
+def test_int_lt(inc_man_spec, inc_man_file, val, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec, Expressions.less_than("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (5, False),
+    (29, False),
+    (30, True),
+    (79, True)])
+def test_int_lt_eq(inc_man_spec, inc_man_file, val, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec,
+                                      Expressions.less_than_or_equal("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (85, False),
+    (79, False),
+    (78, True),
+    (75, True)])
+def test_int_gt(inc_man_spec, inc_man_file, val, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec, Expressions.greater_than("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (85, False),
+    (80, False),
+    (79, True),
+    (75, True)])
+def test_int_gt_eq(inc_man_spec, inc_man_file, val, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec,
+                                      Expressions.greater_than_or_equal("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (5, False),
+    (29, False),
+    (30, True),
+    (75, True),
+    (79, True),
+    (80, False),
+    (85, False)])
+def test_int_eq(inc_man_spec, inc_man_file, val, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec,
+                                      Expressions.equal("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (5, True),
+    (29, True),
+    (30, True),
+    (75, True),
+    (79, True),
+    (80, True),
+    (85, True)])
+def test_int_not_eq(inc_man_spec, inc_man_file, val, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec,
+                                      Expressions.not_equal("id", val)).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (5, True),
+    (29, True),
+    (30, True),
+    (75, True),
+    (79, True),
+    (80, True),
+    (85, True)])
+def test_int_not_eq_rewritten(inc_man_spec, inc_man_file, val, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec,
+                                      Expressions.not_(Expressions.equal("id", val))).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (5, True),
+    (29, True),
+    (30, True),
+    (75, True),
+    (79, True),
+    (80, True),
+    (85, True)])
+def test_case_insensitive_int_not_eq_rewritten(inc_man_spec, inc_man_file, val, expected):
+    assert InclusiveManifestEvaluator(inc_man_spec,
+                                      Expressions.not_(Expressions.equal("ID", val)),
+                                      case_sensitive=False).eval(inc_man_file) == expected
+
+
+@pytest.mark.parametrize("val, expected", [
+    (5, True),
+    (29, True),
+    (30, True),
+    (75, True),
+    (79, True),
+    (80, True),
+    (85, True)])
+def test_case_sensitive_int_not_eq_rewritten(inc_man_spec, inc_man_file, val, expected):
+    with pytest.raises(ValidationException):
+        InclusiveManifestEvaluator(inc_man_spec,
+                                   Expressions.not_(Expressions.equal("ID", val)),
+                                   case_sensitive=True).eval(inc_man_file) == expected