You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by ri...@apache.org on 2015/12/16 23:30:11 UTC

incubator-madlib git commit: Path: Match a pattern in a subset of partition

Repository: incubator-madlib
Updated Branches:
  refs/heads/master 30e92868e -> 3fb26da54


Path: Match a pattern in a subset of partition

JIRA: MADLIB-916

This commit adds the functionality to match a subset of a partition
instead of just the complete partition. This is performed by
array-agging the symbols and then finding the position of a match within
the agg. This position is then compared to an array of ids to get the
actual rows that correlate to this match.

This approach will only work if each symbol is a single character. Since
we allow the user to set symbols to be arbitrary strings, the
user-supplied symbol is replaced with a single character (in the row
match and in the pattern expression).

Other: Parent commit (30e9286) closes #9


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/3fb26da5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/3fb26da5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/3fb26da5

Branch: refs/heads/master
Commit: 3fb26da54084ac083bdbc4754d9e3f935dbff475
Parents: 30e9286
Author: Rahul Iyer <ri...@pivotal.io>
Authored: Wed Dec 16 14:18:20 2015 -0800
Committer: Rahul Iyer <ri...@pivotal.io>
Committed: Wed Dec 16 14:18:20 2015 -0800

----------------------------------------------------------------------
 src/ports/postgres/modules/utilities/path.py_in | 151 ++++++++++++++-----
 .../postgres/modules/utilities/path.sql_in      |  42 ++++--
 .../postgres/modules/utilities/test/path.sql_in |  67 ++++++++
 .../modules/utilities/test/pathing.sql_in       |   0
 4 files changed, 213 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/3fb26da5/src/ports/postgres/modules/utilities/path.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/path.py_in b/src/ports/postgres/modules/utilities/path.py_in
index 30245fb..cac9560 100644
--- a/src/ports/postgres/modules/utilities/path.py_in
+++ b/src/ports/postgres/modules/utilities/path.py_in
@@ -8,16 +8,20 @@
 import plpy
 import shlex
 import string
+import re
 
 from utilities import unique_string
 from utilities import _assert
+from utilities import add_postfix
+from validate_args import get_cols
 from validate_args import input_tbl_valid
 from validate_args import output_tbl_valid
 # ------------------------------------------------------------------------
 
 
 def path(schema_madlib, source_table, output_table, partition_expr,
-         order_expr, pattern_regex, symbol_expr, result_fun, **kwargs):
+         order_expr, pattern_expr, symbol_expr, agg_func,
+         persist_rows=False, **kwargs):
     """
         Perform regular pattern matching over a sequence of rows.
 
@@ -29,70 +33,144 @@ def path(schema_madlib, source_table, output_table, partition_expr,
         @param order_expr: str, Expression to order the input data
         @param pattern_expr: str, Expression to define the pattern to search for
         @param symbol_expr: str, Definition for each symbol, comma-separated list
-        @param result_fun: str, List of the result functions/aggregates to apply on matched patterns
+        @param agg_func: str, List of the result functions/aggregates to apply on matched patterns
 
     """
-    symbol_str = _build_symbol_str(symbol_expr)
     _validate(source_table, output_table, partition_expr, order_expr,
-              pattern_regex, symbol_str, result_fun)
-    input_view = unique_string()
-
-    sql = """
-        CREATE TABLE {output_table} as
-        WITH {input_view} as (
-            select *,
-                case
-                    {symbol_str}
-                end as symbol
-            FROM {source_table}
-        )
-        SELECT
-            {partition_expr},
-            {result_fun}
-        FROM {input_view}
-        GROUP BY {partition_expr}
-        HAVING
-            array_to_string(array_agg(symbol ORDER BY {order_expr}), '') ~* '^{pattern_regex}'
+              pattern_expr, symbol_expr, agg_func, persist_rows)
+    # replace each occurence of the original symbol with the new
+    # perform this operation in descending order of length to avoid substituting
+    # subset of any symbol
+    sym_mapping, sym_str = _parse_symbol_str(symbol_expr)
+    old_sym_desc = list(reversed(sorted(sym_mapping.keys(), key=len)))
+    replace_pattern = re.compile('|'.join(old_sym_desc), re.IGNORECASE)
+    pattern_expr = replace_pattern.sub(
+        lambda m: sym_mapping[re.escape(string.lower(m.group(0)))],
+        pattern_expr)
+    input_with_id = unique_string('input_with_id')
+    matched_view = unique_string('matched_view')
+    matched_rows = add_postfix(output_table, "_rows") if persist_rows else unique_string('matched_rows')
+    table_or_view = 'TABLE' if persist_rows else 'VIEW'
+    id_col_name = unique_string('id_col')
+    seq_gen = unique_string('seq_gen')
+    all_input_cols_str = ', '.join(get_cols(source_table, schema_madlib))
+
+    # build a new input temp table that contains a sequence
+    plpy.execute("CREATE SEQUENCE " + seq_gen)
+    plpy.execute("""
+                 CREATE TEMP TABLE {input_with_id} AS (
+                     SELECT
+                        *,
+                        nextval('{seq_gen}') AS {id_col_name},
+                        CASE
+                            {sym_str}
+                        END AS symbol
+                     FROM {source_table}
+                 )""".format(**locals()))
+    build_matched_rows = """
+        CREATE {table_or_view} {matched_rows} AS (
+            SELECT {all_input_cols_str}
+            FROM
+                {input_with_id} as q1,
+                (
+                 SELECT
+                    unnest(match_ids[match_position+1:match_position+match_length]) as match_ids
+                 FROM(
+                     SELECT
+                        {partition_expr},
+                        length(substring(array_to_string(
+                                    array_agg(symbol ORDER BY {order_expr}), '')
+                               FROM '(?i)(^.*){pattern_expr}')) as match_position,
+                        length(substring(array_to_string(
+                                    array_agg(symbol ORDER BY {order_expr}), '')
+                               FROM '(?i)^.*({pattern_expr}).*$')) as match_length,
+                        array_agg({id_col_name} ORDER BY {order_expr}) as match_ids
+                     FROM {input_with_id}
+                     GROUP BY {partition_expr}
+                     HAVING
+                        array_to_string(array_agg(symbol ORDER BY {order_expr}), '')
+                            ~* '{pattern_expr}'
+                 ) subq
+                ) q2
+            WHERE q1.{id_col_name} = q2.match_ids
+        );
     """.format(**locals())
-    plpy.execute(sql)
+    plpy.execute(build_matched_rows)
+    plpy.execute("""
+        CREATE TABLE {output_table} AS
+           SELECT
+            {partition_expr},
+            {agg_func}
+           FROM {matched_rows}
+           GROUP BY {partition_expr}
+        """.format(**locals()))
+    if not persist_rows:
+        plpy.execute("DROP VIEW IF EXISTS " + matched_rows)
+    plpy.execute("DROP TABLE IF EXISTS " + input_with_id)
+    plpy.execute("DROP SEQUENCE IF EXISTS " + seq_gen)
 # ------------------------------------------------------------------------------
 
 
 def _validate(source_table, output_table, partition_expr, order_expr,
-              pattern_expr, symbol_expr, result_fun):
+              pattern_expr, symbol_expr, agg_func, persist_rows):
     input_tbl_valid(source_table, 'Path')
     output_tbl_valid(output_table, 'Path')
+    if persist_rows:
+        output_tbl_valid(add_postfix(output_table, "_rows"), 'Path')
+
+    # ensure the expressions are not None or empty strings
     _assert(partition_expr, "Path error: Invalid partition expression")
     _assert(order_expr, "Path error: Invalid order expression")
     _assert(pattern_expr, "Path error: Invalid pattern expression")
     _assert(symbol_expr, "Path error: Invalid symbol expression")
-    _assert(result_fun, "Path error: Invalid result expression")
+    _assert(agg_func, "Path error: Invalid result expression")
 # ----------------------------------------------------------------------
 
 
-def _build_symbol_str(symbol_expr):
-    """ Parse a symbol definition string and return a dictionary of the definitions
+def _parse_symbol_str(symbol_expr):
+    """ Parse symbol definition string to build a CASE statement string
+        and return a mapping of the definitions.
+
+        We currently only allow single-character symbols. To allow input for
+        multicharacter symbol, we map the symbols to a single character and
+        return a dictionary that provides the mapping from old symbol to new symbol.
 
         symbol_expr: A comma-separated string containing symbol definitions of the form:
             <symbol> := <symbol_definition>
         Example:
-            symbol_expr = ('BEFORE:=starttime >= \'0:00:00\'::time and '
-                            'starttime < \'9:30:00\', '
-                           'MARKET:=starttime >= \'9:30:00\'::time and '
-                            'starttime < \'16:00:00\'::time')
+            symbol_expr = ('BEFORE:=start >= \'0:00:00\' and start < \'9:30:00\', '
+                           'MARKET:=start >= \'9:30:00\' and start < \'16:00:00\'')
+            output = ({'BEFORE': 'a',
+                       'MARKET': 'b'},
+                       "CASE
+                        WHEN BEFORE THEN start >= \'0:00:00\' and start < \'9:30:00\'
+                        WHEN MARKET THEN start >= \'9:30:00\' and start < \'16:00:00\'
+                       END"
     """
+    all_symbols = iter(string.ascii_lowercase + string.digits)
     symbol_expr_parser = shlex.shlex(symbol_expr)
     symbol_expr_parser.wordchars = [i for i in string.printable
                                     if i not in (symbol_expr_parser.quotes + ",")]
     symbol_expr_parser.whitespace = ','
     symbol_splits = list(symbol_expr_parser)
     symbol_definitions = {}
+    symbol_mapping = {}
     for each_sym in symbol_splits:
         each_sym_splits = each_sym.split(":=")
-        if len(each_sym_splits) > 1:
-            symbol_definitions[each_sym_splits[0].strip()] = each_sym_splits[1].strip()
-    return '\n'.join("WHEN {1} THEN '{0}'::text".format(key, val)
-                     for key, val in symbol_definitions.items())
+        if len(each_sym_splits) == 2:
+            old_sym, sym_def = each_sym_splits
+            old_sym = re.escape(old_sym.strip().lower())
+            try:
+                next_sym = all_symbols.next()
+            except StopIteration:
+                plpy.error("Path error: Maximum number of symbols reached.")
+            _assert(old_sym not in symbol_mapping,
+                    "Path error: Multipe definitions of a symbol")
+            symbol_mapping[old_sym] = next_sym
+            symbol_definitions[next_sym] = sym_def
+    return (symbol_mapping,
+            '\n'.join("WHEN {1} THEN '{0}'::text".format(key, val)
+                      for key, val in symbol_definitions.items()))
 # ----------------------------------------------------------------------
 
 
@@ -120,7 +198,8 @@ SELECT {schema_madlib}.path(
     'order_expr',      -- Order expression to sort the tuples of the data table
     'pattern_def',     -- Definition of the path pattern to search for
     'symbol_def',      -- Definition of various symbols used in the pattern definition
-    'result_fun',      -- Aggregate/window functions to be applied on the matched paths
+    'agg_func',        -- Aggregate/window functions to be applied on the matched paths
+    persist_rows       -- Boolean indicating whether to output the matched rows in a table
 );
 """
     return help_string.format(schema_madlib=schema_madlib)

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/3fb26da5/src/ports/postgres/modules/utilities/path.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/path.sql_in b/src/ports/postgres/modules/utilities/path.sql_in
index 0eeb60d..2dd490d 100644
--- a/src/ports/postgres/modules/utilities/path.sql_in
+++ b/src/ports/postgres/modules/utilities/path.sql_in
@@ -45,7 +45,8 @@ path(
     order_expr,
     pattern,
     symbol,
-    result
+    aggregate_func,
+    persist_rows
 )
 </pre>
 
@@ -93,11 +94,17 @@ for as part of a row sequence. In the SYMBOLS clause, you write a predicate to d
 type of row that matches the symbol.
     </dd>
 
-    <dt>result</dt>
+    <dt>aggregate_func</dt>
     <dd>VARCHAR. A comma-separated list of window functions and aggregates to be
     applied on the matched window.
     </dd>
 
+    <dt>persist_rows</dt>
+    <dd>BOOLEAN. If TRUE the matched rows are persisted in another table.
+    This table is named as <output_table>_rows (the string
+    "_rows" is added as suffix to the value of <em>output_table</em>).
+    </dd>
+
 </dl>
 
 
@@ -175,25 +182,38 @@ FROM sessiontable
 
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.path(
-    source_table        varchar,
-    output_table        varchar,
-    partition_expr      varchar,
-    order_expr          varchar,
-    pattern_regex       varchar,
-    symbol_expr         varchar,
-    result_fun          varchar
+    source_table        VARCHAR,
+    output_table        VARCHAR,
+    partition_expr      VARCHAR,
+    order_expr          VARCHAR,
+    pattern_expr        VARCHAR,
+    symbol_expr         VARCHAR,
+    agg_func          VARCHAR,
+    persist_rows        BOOLEAN
 ) RETURNS void AS $$
 PythonFunction(utilities, path, path)
 $$ LANGUAGE plpythonu
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
+
 -------------------------------------------------------------------------
--- To Implement: overloaded functions for default arguments ---------------
+-- Overloaded functions for default arguments ---------------
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.path(
+    source_table        VARCHAR,
+    output_table        VARCHAR,
+    partition_expr      VARCHAR,
+    order_expr          VARCHAR,
+    pattern_expr        VARCHAR,
+    symbol_expr         VARCHAR,
+    agg_func          VARCHAR
+) RETURNS void AS $$
+    SELECT MADLIB_SCHEMA.path($1, $2, $3, $4, $5, $6, $7, FALSE)
+$$ LANGUAGE SQL
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
 
 -------------------------------------------------------------------------
 -- To Implement -----------------------------------------------------------
-
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.path(message TEXT)
 RETURNS text AS $$
 PythonFunction(utilities, path, path_help_message)

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/3fb26da5/src/ports/postgres/modules/utilities/test/path.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/test/path.sql_in b/src/ports/postgres/modules/utilities/test/path.sql_in
new file mode 100644
index 0000000..1700070
--- /dev/null
+++ b/src/ports/postgres/modules/utilities/test/path.sql_in
@@ -0,0 +1,67 @@
+DROP TABLE IF EXISTS weblog, path_output, path_output_rows CASCADE;
+
+CREATE TABLE weblog (event_timestamp TIMESTAMP,
+            user_id INT,
+            age_group INT,
+            income_group INT,
+            gender TEXT,
+            region TEXT,
+            household_size INT,
+            click_event INT,
+            purchase_event INT,
+            revenue FLOAT,
+            margin FLOAT);
+
+INSERT INTO weblog VALUES
+('04/14/2012 23:43:00', 102201, 3, 3, 'Female', 'East', 3, 1, 1, 112, 36),
+('04/14/2012 23:56:00', 101881, 2, 4, 'Male', 'West', 5, 0, 0, 0, 0),
+('04/15/2012 01:04:00', 100821, 1, 4, 'Unknown', 'West', 3, 0, 0, 0, 0),
+('04/15/2012 01:15:00', 101121, 2, 2, 'Unknown', 'West', 4, 0, 0, 0, 0),
+('04/15/2012 02:53:00', 102201, 3, 3, 'Female', 'East', 3, 1, 1, 117, 28),
+('04/15/2012 04:11:00', 103711, 4, 3, 'Female', 'Central', 5, 0, 0, 0, 0),
+('04/15/2012 04:25:00', 100821, 1, 4, 'Unknown', 'West', 3, 1, 1, 91, 28),
+('04/15/2012 06:26:00', 102871, 3, 4, 'Female', 'Central', 5, 0, 0, 0, 0),
+('04/15/2012 06:32:00', 100821, 1, 4, 'Unknown', 'West', 3, 0, 0, 0, 0),
+('04/15/2012 07:02:00', 100821, 1, 4, 'Unknown', 'West', 3, 1, 1, 118, 39),
+('04/15/2012 08:51:00', 102201, 3, 3, 'Female', 'East', 3, 0, 0, 0, 0),
+('04/15/2012 09:28:00', 101121, 2, 2, 'Unknown', 'West', 4, 1, 1, 103, 32),
+('04/15/2012 10:19:00', 103711, 4, 3, 'Female', 'Central', 5, 0, 0, 0, 0),
+('04/15/2012 11:40:00', 100821, 1, 4, 'Unknown', 'West', 3, 0, 0, 0, 0),
+('04/15/2012 12:58:00', 101121, 2, 2, 'Unknown', 'West', 4, 1, 1, 148, 23),
+('04/15/2012 14:18:00', 101121, 2, 2, 'Unknown', 'West', 4, 1, 1, 113, 29),
+('04/15/2012 22:20:00', 101121, 2, 2, 'Unknown', 'West', 4, 1, 1, 108, 38),
+('04/15/2012 23:13:00', 102201, 3, 3, 'Female', 'East', 3, 0, 0, 0, 0),
+('04/15/2012 23:14:00', 103711, 4, 3, 'Female', 'Central', 5, 0, 0, 0, 0),
+('04/16/2012 01:55:00', 101121, 2, 2, 'Unknown', 'West', 4, 0, 0, 0, 0),
+('04/16/2012 02:12:00', 100821, 1, 4, 'Unknown', 'West', 3, 1, 1, 153, 26),
+('04/16/2012 04:20:00', 102201, 3, 3, 'Female', 'East', 3, 0, 0, 0, 0),
+('04/16/2012 05:38:00', 101121, 2, 2, 'Unknown', 'West', 4, 1, 0, 0, 0),
+('04/16/2012 05:44:00', 102201, 3, 3, 'Female', 'East', 3, 1, 0, 0, 0),
+('04/16/2012 05:59:00', 102871, 3, 4, 'Female', 'Central', 5, 1, 0, 0, 0),
+('04/16/2012 09:35:00', 102871, 3, 4, 'Female', 'Central', 5, 1, 0, 0, 0),
+('04/16/2012 10:40:00', 101331, 2, 4, 'Female', 'East', 5, 0, 0, 0, 0),
+('04/16/2012 14:23:00', 102871, 3, 4, 'Female', 'Central', 5, 0, 0, 0, 0),
+('04/16/2012 20:46:00', 101121, 2, 2, 'Unknown', 'West', 4, 1, 1, 131, 28),
+('04/16/2012 21:11:00', 101331, 2, 4, 'Female', 'East', 5, 1, 1, 127, 27),
+('04/16/2012 22:35:00', 101121, 2, 2, 'Unknown', 'West', 4, 0, 0, 0, 0),
+('04/16/2012 23:51:00', 101881, 2, 4, 'Male', 'West', 5, 0, 0, 0, 0),
+('04/16/2012 23:55:00', 101331, 2, 4, 'Female', 'East', 5, 0, 0, 0, 0),
+('04/16/2012 23:56:00', 101331, 2, 4, 'Female', 'East', 5, 1, 0, 0, 0),
+('04/16/2012 23:57:00', 101331, 2, 4, 'Female', 'East', 5, 1, 1, 456, 77);
+
+/*
+SELECT * FROM weblog ORDER BY event_timestamp ASC;
+*/
+SELECT madlib.path(
+     'weblog',              -- Name of the table
+     '"Path_output"',         -- Table name to store the path results
+     'user_id',             -- Partition expression to group the data table
+     'event_timestamp ASC',         -- Order expression to sort the tuples of the data table
+     'I(click){1}(CONV){1}',        -- Definition of the path pattern to search for
+     'I:=click_event=0 AND purchase_event=0, Click:=click_event=1 AND purchase_event=0, Conv:=purchase_event=1',    -- Definition of various symbols used in the pattern definition
+     'COUNT(*)'             -- Aggregate/window functions to be applied on the matched paths
+    ,TRUE
+     );
+------------------------------------------------------------
+
+SELECT * FROM "Path_output";

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/3fb26da5/src/ports/postgres/modules/utilities/test/pathing.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/test/pathing.sql_in b/src/ports/postgres/modules/utilities/test/pathing.sql_in
deleted file mode 100644
index e69de29..0000000