You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by xt...@apache.org on 2016/02/25 22:27:24 UTC

incubator-madlib git commit: Path: Allow multiple matches in single partition

Repository: incubator-madlib
Updated Branches:
  refs/heads/master 06f1cb893 -> 33c0a72c6


Path: Allow multiple matches in single partition

JIRA: MADLIB-917

Path query was rewritten to use array functions. This was necessary
since multiple matches produced a set of matches for each group. This
set value has to be unnest - this was not possible in GPDB since the
GROUP BY placed the executor in a context that does not allow SRF.
Solution was to avoid the unnest by using array functions for all
computations and finally unnesting without the GROUP BY.

Closes #21


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/33c0a72c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/33c0a72c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/33c0a72c

Branch: refs/heads/master
Commit: 33c0a72c69e9c986067eddad99c8977c7874e9f9
Parents: 06f1cb8
Author: Rahul Iyer <ri...@pivotal.io>
Authored: Thu Dec 24 14:15:36 2015 -0800
Committer: Xiaocheng Tang <xi...@gmail.com>
Committed: Thu Feb 25 13:26:26 2016 -0800

----------------------------------------------------------------------
 doc/mainpage.dox.in                             |   2 +-
 methods/array_ops/src/pg_gp/array_ops.c         | 136 +++++++++
 methods/array_ops/src/pg_gp/array_ops.sql_in    |  28 ++
 .../recursive_partitioning/decision_tree.py_in  |   8 +-
 .../postgres/modules/regress/margins.py_in      |   2 -
 src/ports/postgres/modules/utilities/path.py_in | 280 ++++++++++++++-----
 .../postgres/modules/utilities/path.sql_in      |  71 ++---
 .../postgres/modules/utilities/test/path.sql_in |   2 +-
 .../modules/utilities/validate_args.py_in       |  71 +++--
 9 files changed, 461 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/doc/mainpage.dox.in
----------------------------------------------------------------------
diff --git a/doc/mainpage.dox.in b/doc/mainpage.dox.in
index 3f31f5e..59475fd 100644
--- a/doc/mainpage.dox.in
+++ b/doc/mainpage.dox.in
@@ -233,7 +233,7 @@ Interface and implementation are subject to change.
 
     @defgroup grp_cg Conjugate Gradient
     @defgroup grp_bayes Naive Bayes Classification
-    @defgroup grp_path Pathing functions
+    @defgroup grp_path Path functions
     @defgroup grp_sample Random Sampling
     @defgroup grp_kernmach Support Vector Machines
 @}

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/methods/array_ops/src/pg_gp/array_ops.c
----------------------------------------------------------------------
diff --git a/methods/array_ops/src/pg_gp/array_ops.c b/methods/array_ops/src/pg_gp/array_ops.c
index 57ee3bf..598df85 100644
--- a/methods/array_ops/src/pg_gp/array_ops.c
+++ b/methods/array_ops/src/pg_gp/array_ops.c
@@ -47,6 +47,8 @@ static ArrayType *General_2Array_to_Array(ArrayType *v1, ArrayType *v2,
         Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid));
 static ArrayType *General_Array_to_Array(ArrayType *v1, Datum value,
         Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid));
+static ArrayType *General_Array_to_Cumulative_Array(ArrayType *v1, Datum value,
+        Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid));
 static Datum General_2Array_to_Element(ArrayType *v1, ArrayType *v2,
         Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid),
         Datum(*finalize_function)(Datum,int,Oid));
@@ -491,6 +493,7 @@ array_sum(PG_FUNCTION_ARGS){
     return float8_datum_cast(DatumGetFloat8(res), element_type);
 }
 
+
 /*
  * This function returns sum of the array elements' absolute value.
  */
@@ -893,6 +896,36 @@ array_scalar_add(PG_FUNCTION_ARGS){
 }
 
 /*
+ * This function returns the cumulative sum of the array elements.
+ */
+PG_FUNCTION_INFO_V1(array_cum_sum);
+Datum
+array_cum_sum(PG_FUNCTION_ARGS){
+    if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); }
+
+    ArrayType *v = PG_GETARG_ARRAYTYPE_P(0);
+    ArrayType *res = General_Array_to_Cumulative_Array(v, Float8GetDatum(0.0), element_add);
+
+    PG_FREE_IF_COPY(v, 0);
+    PG_RETURN_ARRAYTYPE_P(res);
+}
+
+/*
+ * This function returns the cumulative product of the array elements.
+ */
+PG_FUNCTION_INFO_V1(array_cum_prod);
+Datum
+array_cum_prod(PG_FUNCTION_ARGS){
+    if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); }
+
+    ArrayType *v = PG_GETARG_ARRAYTYPE_P(0);
+    ArrayType *res = General_Array_to_Cumulative_Array(v, Float8GetDatum(1.0), element_mult);
+
+    PG_FREE_IF_COPY(v, 0);
+    PG_RETURN_ARRAYTYPE_P(res);
+}
+
+/*
  * This function removes elements with specified value from an array.
  */
 PG_FUNCTION_INFO_V1(array_filter);
@@ -1970,3 +2003,106 @@ General_Array_to_Array(
 
     return pgarray;
 }
+
+
+/*
+ * @brief Transforms an array to another array using cumulative operations.
+ *
+ * @param v1 Array.
+ * @param initial Parameter.
+ * @param element_function Map function.
+ * @returns Transformed array.
+ */
+ArrayType*
+General_Array_to_Cumulative_Array(
+        ArrayType *v1,
+        Datum initial,
+        Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid)) {
+
+    // dimensions
+    int ndims1 = ARR_NDIM(v1);
+    if (ndims1 == 0) {
+        elog(WARNING, "input are empty arrays.");
+        return v1;
+    }
+    int ndims = ndims1;
+    int *lbs1 = ARR_LBOUND(v1);
+    int *dims1 = ARR_DIMS(v1);
+    int *dims = (int *) palloc(ndims * sizeof(int));
+    int *lbs = (int *) palloc(ndims * sizeof(int));
+    int i = 0;
+    for (i = 0; i < ndims; i ++) {
+        dims[i] = dims1[i];
+        lbs[i] = lbs1[i];
+    }
+    int nitems = ArrayGetNItems(ndims, dims);
+
+    // nulls
+    if (ARR_HASNULL(v1)) {
+        ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+                        errmsg("arrays cannot contain nulls"),
+                        errdetail("Arrays with element value NULL are not allowed.")));
+    }
+
+    // type
+    Oid element_type = ARR_ELEMTYPE(v1);
+    TypeCacheEntry *typentry = lookup_type_cache(element_type,TYPECACHE_CMP_PROC_FINFO);
+    int type_size = typentry->typlen;
+    bool typbyval = typentry->typbyval;
+    char typalign = typentry->typalign;
+
+    // allocate
+    Datum *result = NULL;
+    switch (element_type) {
+        case INT2OID:
+        case INT4OID:
+        case INT8OID:
+        case FLOAT4OID:
+        case FLOAT8OID:
+        case NUMERICOID:
+            result = (Datum *)palloc(nitems * sizeof(Datum));break;
+        default:
+            ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                            errmsg("type is not supported"),
+                            errdetail("Arrays with element type %s are not supported.",
+                                      format_type_be(element_type))));
+            break;
+    }
+
+    // iterate
+    Datum *resultp = result;
+    char *dat1 = ARR_DATA_PTR(v1);
+    float prev_dat = DatumGetFloat8(initial);
+    for (i = 0; i < nitems; i ++) {
+        // iterate elt1
+        Datum elt1 = fetch_att(dat1, typbyval, type_size);
+        dat1 = att_addlength_pointer(dat1, type_size, dat1);
+        dat1 = (char *) att_align_nominal(dat1, typalign);
+
+        *resultp = element_function(elt1,
+                                    element_type,
+                                    elt1,         /* placeholder */
+                                    element_type, /* placeholder */
+                                    prev_dat,
+                                    element_type);
+        prev_dat = *resultp;
+        resultp++;
+    }
+
+    // construct return result
+    ArrayType *pgarray = construct_md_array(result,
+                                            NULL,
+                                            ndims,
+                                            dims,
+                                            lbs,
+                                            element_type,
+                                            type_size,
+                                            typbyval,
+                                            typalign);
+
+    pfree(result);
+    pfree(dims);
+    pfree(lbs);
+
+    return pgarray;
+}

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/methods/array_ops/src/pg_gp/array_ops.sql_in
----------------------------------------------------------------------
diff --git a/methods/array_ops/src/pg_gp/array_ops.sql_in b/methods/array_ops/src/pg_gp/array_ops.sql_in
index 80f50ab..6179f26 100644
--- a/methods/array_ops/src/pg_gp/array_ops.sql_in
+++ b/methods/array_ops/src/pg_gp/array_ops.sql_in
@@ -620,3 +620,31 @@ DROP FUNCTION IF EXISTS MADLIB_SCHEMA.array_contains_null(float8[]) CASCADE;
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.array_contains_null(x anyarray) RETURNS BOOLEAN
 AS 'MODULE_PATHNAME', 'array_contains_null' LANGUAGE C IMMUTABLE STRICT
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+/**
+ * @brief This function takes an array as the input and computes the
+ *        cumulative sum with the first element being the same.
+ *        It requires that all the values are NON-NULL.
+ *
+ * @param x Array x
+ * @returns Cumulative sum of the elements in x.
+ *
+ */
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.array_cum_sum(x anyarray) RETURNS anyarray
+AS 'MODULE_PATHNAME', 'array_cum_sum'
+LANGUAGE C IMMUTABLE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+/**
+ * @brief This function takes an array as the input and computes the
+ *        cumulative product with the first element being the same.
+ *        It requires that all the values are NON-NULL.
+ *
+ * @param x Array x
+ * @returns Cumulative product of the elements in x.
+ *
+ */
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.array_cum_prod(x anyarray) RETURNS anyarray
+AS 'MODULE_PATHNAME', 'array_cum_prod'
+LANGUAGE C IMMUTABLE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in b/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in
index 6fc8689..4d15e43 100644
--- a/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in
+++ b/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in
@@ -26,7 +26,7 @@ from utilities.validate_args import table_exists
 from utilities.validate_args import table_is_empty
 from utilities.validate_args import columns_exist_in_table
 from utilities.validate_args import is_var_valid
-from utilities.validate_args import _unquote_name
+from utilities.validate_args import unquote_ident
 from utilities.utilities import _assert
 from utilities.utilities import extract_keyvalue_params
 from utilities.utilities import unique_string
@@ -113,12 +113,12 @@ def _get_features_to_use(schema_madlib, training_table_name,
     #  list.
     if grouping_cols:
         group_set = set(split_quoted_delimited_str(grouping_cols))
-        group_set |= set(_unquote_name(i)
+        group_set |= set(unquote_ident(i)
                          for i in split_quoted_delimited_str(grouping_cols))
     else:
         group_set = set()
     other_col_set = set([id_col_name, weights, dependent_variable])
-    other_col_set |= set(_unquote_name(i)
+    other_col_set |= set(unquote_ident(i)
                          for i in [id_col_name, weights, dependent_variable])
 
     if list_of_features.strip() == '*':
@@ -139,7 +139,7 @@ def _get_features_to_use(schema_madlib, training_table_name,
 def _get_col_value(input_dict, col_name):
     """Return value from dict where key could be quoted or unquoted name"""
     return input_dict.get(
-        col_name, input_dict.get(_unquote_name(col_name)))
+        col_name, input_dict.get(unquote_ident(col_name)))
 # -------------------------------------------------------------------------
 
 

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/regress/margins.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/regress/margins.py_in b/src/ports/postgres/modules/regress/margins.py_in
index 6858f7a..15c5f9d 100644
--- a/src/ports/postgres/modules/regress/margins.py_in
+++ b/src/ports/postgres/modules/regress/margins.py_in
@@ -23,8 +23,6 @@ from margins_builder import TermBase
 import re
 #------------------------------------------------------------------------------
 
-m4_changequote(`<!', `!>')
-
 
 def margins(schema_madlib, model_table, out_table, x_design=None,
             source_table=None, marginal_vars=None, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/utilities/path.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/path.py_in b/src/ports/postgres/modules/utilities/path.py_in
index 44b4d81..67abf64 100644
--- a/src/ports/postgres/modules/utilities/path.py_in
+++ b/src/ports/postgres/modules/utilities/path.py_in
@@ -5,9 +5,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,15 +27,20 @@ import shlex
 import string
 import re
 
+from control import MinWarning
 from utilities import unique_string
 from utilities import _assert
 from utilities import add_postfix
 from validate_args import get_cols
 from validate_args import input_tbl_valid
 from validate_args import output_tbl_valid
+from validate_args import quote_ident
 # ------------------------------------------------------------------------
 
 
+m4_changequote(`<!', `!>')
+
+
 def path(schema_madlib, source_table, output_table, partition_expr,
          order_expr, pattern_expr, symbol_expr, agg_func,
          persist_rows=False, **kwargs):
@@ -53,82 +58,201 @@ def path(schema_madlib, source_table, output_table, partition_expr,
         @param agg_func: str, List of the result functions/aggregates to apply on matched patterns
 
     """
-    if not partition_expr:
-        partition_expr = "1 = 1"
-        as_partition_expr = "1 = 1 as ALL"
-    else:
-        as_partition_expr = partition_expr
-    _validate(source_table, output_table, partition_expr, order_expr,
-              pattern_expr, symbol_expr, agg_func, persist_rows)
-    # replace each occurence of the original symbol with the new
-    # perform this operation in descending order of length to avoid substituting
-    # subset of any symbol
-    sym_mapping, sym_str = _parse_symbol_str(symbol_expr)
-    old_sym_desc = list(reversed(sorted(sym_mapping.keys(), key=len)))
-    replace_pattern = re.compile('|'.join(old_sym_desc), re.IGNORECASE)
-    pattern_expr = replace_pattern.sub(
-        lambda m: sym_mapping[re.escape(string.lower(m.group(0)))],
-        pattern_expr)
-    input_with_id = unique_string('input_with_id')
-    matched_view = unique_string('matched_view')
-    matched_rows = add_postfix(output_table, "_tuples") if persist_rows else unique_string('matched_rows')
-    table_or_view = 'TABLE' if persist_rows else 'VIEW'
-    id_col_name = unique_string('id_col')
-    seq_gen = unique_string('seq_gen')
-    all_input_cols_str = ', '.join(get_cols(source_table, schema_madlib))
-
-    # build a new input temp table that contains a sequence
-    plpy.execute("CREATE SEQUENCE " + seq_gen)
-    plpy.execute("""
-                 CREATE TEMP TABLE {input_with_id} AS (
-                     SELECT
-                        *,
-                        nextval('{seq_gen}') AS {id_col_name},
-                        CASE
-                            {sym_str}
-                        END AS symbol
-                     FROM {source_table}
-                 )""".format(**locals()))
-    build_matched_rows = """
-        CREATE {table_or_view} {matched_rows} AS (
+    with MinWarning("error"):
+        if not partition_expr:
+            partition_expr = "1 = 1"
+            as_partition_expr = "1 = 1 as all"
+        else:
+            as_partition_expr = partition_expr
+        _validate(source_table, output_table, partition_expr, order_expr,
+                  pattern_expr, symbol_expr, agg_func, persist_rows)
+
+        # replace each occurence of the original symbol with the new
+        # perform this operation in descending order of length to avoid substituting
+        # subset of any symbol
+        sym_mapping, sym_str = _parse_symbol_str(symbol_expr)
+        old_sym_desc = list(reversed(sorted(sym_mapping.keys(), key=len)))
+        replace_pattern = re.compile('|'.join(old_sym_desc), re.IGNORECASE)
+        pattern_expr = replace_pattern.sub(
+            lambda m: sym_mapping[re.escape(string.lower(m.group(0)))],
+            pattern_expr)
+
+        # build variables for intermediate objects
+        input_with_id = unique_string('input_with_id')
+        matched_view = unique_string('matched_view')
+        id_col_name = unique_string('id_col')
+        matched_partitions = unique_string('matched_partitions')
+        seq_gen = unique_string('seq_gen')
+        symbol_name_str = unique_string('symbol')
+        all_input_cols_str = ', '.join(get_cols(source_table, schema_madlib))
+        if persist_rows:
+            matched_rows = add_postfix(output_table, "_tuples")
+            table_or_view = 'TABLE'
+        else:
+            matched_rows = unique_string('matched_rows')
+            table_or_view = 'VIEW'
+
+        # build a new input temp table that contains a sequence and partition columns
+        split_p_cols = [i.strip() for i in partition_expr.split(',')]
+        p_col_names = [unique_string() for i in split_p_cols]
+        p_col_as_str = ','.join(
+            [i + " AS " + j for i, j in zip(split_p_cols, p_col_names)])
+        p_col_name_str = ', '.join(p_col_names)
+        distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
+                                <!"DISTRIBUTED BY ({0})".format(p_col_name_str)!>)
+        plpy.execute("CREATE SEQUENCE " + seq_gen)
+        plpy.execute("""
+                     CREATE TEMP TABLE {input_with_id} AS
+                         SELECT
+                            {p_col_as_str},
+                            *,
+                            nextval('{seq_gen}') AS {id_col_name},
+                            CASE
+                                {sym_str}
+                            END AS {symbol_name_str}
+                         FROM {source_table}
+                     {distribution}
+                    """.format(**locals()))
+        # Explanation for computing the path matches:
+        #   Match is performed using regular expression pattern matching on a
+        #   string produced by concatenating the symbols. The exact rows that
+        #   produce the match are identified by correlating the matched string
+        #   indices with another array containing row ids.
+        #
+        #   matched_partitions: For each partition (group), concatenate all symbols
+        #       into a single string (sym_str). Keep corresponding ids in an array in the
+        #       same order as the symbols. This is performed only for partitions
+        #       that contain a match.
+        #   build_multiple_matched_rows:
+        #       q1: Split sym_str into an array containing the lengths of the
+        #           strings between the matches.
+        #       q2: Store lengths of matches into an array
+        #       q3: Merge q1 and q2 and unnest the arrays (ensuring same length).
+        #           Also right shift the matches array.
+        #       q4: Compute the cumulative sum of the arrays.
+        plpy.execute("""
+            CREATE TABLE {matched_partitions} AS
+                SELECT
+                    {p_col_name_str},
+                    array_to_string(array_agg({symbol_name_str} ORDER BY {order_expr}), '') as sym_str,
+                    array_agg({id_col_name} ORDER BY {order_expr}) as matched_ids
+                FROM {input_with_id}
+                GROUP BY {p_col_name_str}
+                HAVING array_to_string(array_agg({symbol_name_str} ORDER BY {order_expr}), '') ~* '{pattern_expr}'
+            """.format(**locals()))
+        build_multiple_matched_rows = """
+            CREATE {table_or_view} {matched_rows} AS
             SELECT {all_input_cols_str}
             FROM
-                {input_with_id} as q1,
+                {input_with_id} as source,
                 (
-                 SELECT
-                    unnest(match_ids[match_position+1:match_position+match_length]) as match_ids
-                 FROM(
+                    SELECT
+                        unnest(matched_ids[l:r]) AS matched_ids
+                    FROM
+                    (
                      SELECT
-                        length(substring(array_to_string(
-                                    array_agg(symbol ORDER BY {order_expr}), '')
-                               FROM '(?i)(^.*){pattern_expr}')) as match_position,
-                        length(substring(array_to_string(
-                                    array_agg(symbol ORDER BY {order_expr}), '')
-                               FROM '(?i)^.*({pattern_expr}).*$')) as match_length,
-                        array_agg({id_col_name} ORDER BY {order_expr}) as match_ids
-                     FROM {input_with_id}
-                     GROUP BY {partition_expr}
-                     HAVING
-                        array_to_string(array_agg(symbol ORDER BY {order_expr}), '')
-                            ~* '{pattern_expr}'
-                 ) subq
-                ) q2
-            WHERE q1.{id_col_name} = q2.match_ids
-        );
-    """.format(**locals())
-    plpy.execute(build_matched_rows)
-    plpy.execute("""
-        CREATE TABLE {output_table} AS
-           SELECT
-            {as_partition_expr},
-            {agg_func}
-           FROM {matched_rows}
-           GROUP BY {partition_expr}
-        """.format(**locals()))
-    if not persist_rows:
-        plpy.execute("DROP VIEW IF EXISTS " + matched_rows)
-    plpy.execute("DROP TABLE IF EXISTS " + input_with_id)
-    plpy.execute("DROP SEQUENCE IF EXISTS " + seq_gen)
+                        matched_ids,
+                        unnest(left_range) AS l,
+                        unnest(right_range) AS r
+                     FROM (
+                        SELECT
+                            {p_col_name_str},
+                            matched_ids,
+                            {m}.array_cum_sum({m}.array_add(match_splits, prev_matches)) AS left_range,
+                            {m}.array_cum_sum({m}.array_add(match_splits, matches)) AS right_range
+                        FROM (
+                            SELECT
+                               {p_col_name_str}, q1.matched_ids,
+                               match_splits[1:array_upper(matches, 1)] AS match_splits,
+                               matches AS matches,
+                               array_prepend(1, matches[1:array_upper(matches, 1) - 1]) AS prev_matches
+                            FROM (
+                                    -- lengths of text between matches
+                                    SELECT
+                                        {p_col_name_str},
+                                        matched_ids,
+                                        array_agg(length(match_splits) ORDER BY match_index) AS match_splits
+                                    FROM (
+                                        SELECT
+                                            {p_col_name_str},
+                                            matched_ids,
+                                            generate_series(1, array_upper(match_splits, 1)) AS match_index,
+                                            unnest(match_splits) AS match_splits
+                                        FROM (
+                                            SELECT
+                                                {p_col_name_str},
+                                                matched_ids,
+                                                regexp_split_to_array(sym_str, '(?i){pattern_expr}') AS match_splits
+                                            FROM
+                                                {matched_partitions}
+                                            ) ssubq1
+                                        ) subq1
+                                    GROUP BY {p_col_name_str}, matched_ids
+                                ) q1
+                                JOIN
+                                (
+                                    -- lengths of all matches
+                                    SELECT
+                                        {p_col_name_str},
+                                        matched_ids,
+                                        array_agg(length(matches) ORDER BY match_index) AS matches
+                                    FROM (
+                                        SELECT
+                                            {p_col_name_str},
+                                            matched_ids,
+                                            generate_series(1, num_matches) AS match_index,
+                                            (regexp_matches(
+                                                   sym_str, '(?i)({pattern_expr})', 'g'))[1] AS matches
+                                        FROM (
+                                            SELECT
+                                                {p_col_name_str},
+                                                matched_ids,
+                                                sym_str,
+                                                count(matches) AS num_matches
+                                            FROM (
+                                                SELECT
+                                                    {p_col_name_str},
+                                                    matched_ids,
+                                                    sym_str,
+                                                    (regexp_matches(
+                                                           sym_str, '(?i)({pattern_expr})', 'g'))[1] AS matches
+                                                FROM
+                                                   {matched_partitions}
+                                                ) t1
+                                            GROUP BY {p_col_name_str}, matched_ids, sym_str
+                                            ) t2
+                                        ) subq2
+                                    GROUP BY {p_col_name_str}, matched_ids
+                                ) q2
+                                USING ({p_col_name_str})
+                            GROUP BY {p_col_name_str}, q1.matched_ids, match_splits, matches
+                            ) q3
+                        ) q4
+                    ) q5
+                ) matched_rows
+            WHERE source.{id_col_name} = matched_rows.matched_ids
+         """.format(m=schema_madlib, **locals())
+        plpy.execute(build_multiple_matched_rows)
+        quoted_split_p_cols = [quote_ident(i) for i in split_p_cols]
+        p_col_orig_name_str = ','.join(
+            [i + " AS " + j for i, j in zip(p_col_names, quoted_split_p_cols)])
+
+        plpy.execute("""
+            CREATE TABLE {output_table} AS
+               SELECT
+                    {as_partition_expr},
+                    {agg_func}
+               FROM {matched_rows}
+               GROUP BY {partition_expr}
+            """.format(**locals()))
+        if not persist_rows:
+            plpy.execute("DROP VIEW IF EXISTS " + matched_rows)
+            plpy.execute("DROP TABLE IF EXISTS " + input_with_id)
+            plpy.execute("DROP SEQUENCE IF EXISTS " + seq_gen)
+    result = "Result available in table " + output_table
+    if persist_rows:
+        result += "\n Matched tuples can be found in table " + matched_rows
+    return result
 # ------------------------------------------------------------------------------
 
 
@@ -139,6 +263,11 @@ def _validate(source_table, output_table, partition_expr, order_expr,
     if persist_rows:
         output_tbl_valid(add_postfix(output_table, "_tuples"), 'Path')
 
+    plpy.execute("""SELECT {partition_expr}
+                    FROM {source_table}
+                    ORDER BY {order_expr}
+                    LIMIT 0
+                 """.format(**locals()))
     # ensure the expressions are not None or empty strings
     _assert(partition_expr, "Path error: Invalid partition expression")
     _assert(order_expr, "Path error: Invalid order expression")
@@ -168,6 +297,7 @@ def _parse_symbol_str(symbol_expr):
                         WHEN MARKET THEN start >= \'9:30:00\' and start < \'16:00:00\'
                        END"
     """
+
     all_symbols = iter(string.ascii_lowercase + string.digits)
     symbol_expr_parser = shlex.shlex(symbol_expr)
     symbol_expr_parser.wordchars = [i for i in string.printable
@@ -217,8 +347,8 @@ SELECT {schema_madlib}.path(
     'output_table',    -- Table name to store the path results
     'partition_expr',  -- Partition expression to group the data table
     'order_expr',      -- Order expression to sort the tuples of the data table
-    'pattern_def',     -- Definition of the path pattern to search for
     'symbol_def',      -- Definition of various symbols used in the pattern definition
+    'pattern_def',     -- Definition of the path pattern to search for
     'agg_func',        -- Aggregate/window functions to be applied on the matched paths
     persist_rows       -- Boolean indicating whether to output the matched rows
                        --  in an additional table (named <output_table>_tuples)

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/utilities/path.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/path.sql_in b/src/ports/postgres/modules/utilities/path.sql_in
index db06fb6..a538449 100644
--- a/src/ports/postgres/modules/utilities/path.sql_in
+++ b/src/ports/postgres/modules/utilities/path.sql_in
@@ -59,8 +59,8 @@ path(
     output_table,
     partition_expr,
     order_expr,
-    pattern,
     symbol,
+    pattern,
     aggregate_func,
     persist_rows
 )
@@ -72,19 +72,26 @@ path(
     <dd>VARCHAR. Name of the source table, containing data for path analysis.</dd>
 
     <dt>output_table</dt>
-    <dd>VARCHAR. Name of the result table. This table contains columns predicated
-    by the <em>result</em> argument (shown below).</dd>
+    <dd>VARCHAR. Name of the result table. This table contains columns
+    predicated by the <em>result</em> argument (shown below).</dd>
 
     <dt>partition_expr</dt>
-    <dd>VARCHAR. The 'partition_expr' can be a list of columns or expressions (separated by comma)
-    to divide all rows into groups, or partitions, that share the same values of the
-    partition expression(s). For each row, the matching is applied across the rows
-    that fall into the same partition. This can be NULL or '' to indicate the matching
-    is to be applied on the whole table.</dd>
+    <dd>VARCHAR. The 'partition_expr' can be a single column or a list of
+    comma-separated columns/expressions to divide all rows into groups,
+    or partitions, that share the same values of the partition expression(s).
+    The matching is applied across the rows that fall into the same partition.
+    This can be NULL or '' to indicate the matching is to be applied on the
+    whole table.</dd>
 
     <dt>order_expr</dt>
-    <dd>VARCHAR. This expression controls the order in which rows are processed or matched in a
-    partition.
+    <dd>VARCHAR. This expression controls the order in which rows are
+    processed or matched in a partition.
+    </dd>
+
+    <dt>symbol</dt>
+    <dd>VARCHAR. A symbol represents a row of a particular type that you’re
+    searching for as part of a row sequence. In the SYMBOLS clause, you write a
+    predicate to define the type of row that matches the symbol.
     </dd>
 
     <dt>pattern</dt>
@@ -104,12 +111,6 @@ path(
         </ul>
     </dd>
 
-    <dt>symbol</dt>
-    <dd>VARCHAR. A symbol represents a row of a particular type that you’re searching
-for as part of a row sequence. In the SYMBOLS clause, you write a predicate to define the
-type of row that matches the symbol.
-    </dd>
-
     <dt>aggregate_func</dt>
     <dd>VARCHAR. A comma-separated list of window functions and aggregates to be
     applied on the matched window.
@@ -129,8 +130,8 @@ type of row that matches the symbol.
 
 - Build sample dataset
 <pre class="example">
-CREATE TABLE data (id integer, sessionid integer, starttime timestamp, topic varchar, portfolio integer);
-COPY data FROM STDIN DELIMITER AS '|';
+CREATE TABLE trades (id integer, sessionid integer, starttime timestamp, topic varchar, portfolio integer);
+COPY trades FROM STDIN DELIMITER AS '|';
 1|1|1/01/00 5:00 AM|Real-time Equity pricing|769
 1|2|1/01/00 5:30 AM|Real-time Index pricing|9898
 1|3|1/01/00 6:00 AM|Real-time Index pricing|9898
@@ -166,15 +167,6 @@ COPY data FROM STDIN DELIMITER AS '|';
 2|13|1/04/00 12:00 AM|Bond pricing|55
 2|14|1/04/00 12:00 AM|Bond pricing|55
 \\.
-&nbsp;
-CREATE TABLE trades AS
-SELECT *, starttimestamp::date startdate,
-        CASE WHEN  THEN 'before'
-            WHEN  THEN 'market'
-            WHEN  THEN 'close'
-            WHEN  THEN 'after'
-        END tradingperiod
-FROM sessiontable
 </pre>
 
 - Compare within each day for the same Topic/Portfolio across every user
@@ -182,15 +174,14 @@ FROM sessiontable
     SELECT madlib.path(
         'trades',
         'trades_out',
-        'startdate, topic, portfolio'  \-\- each day of activity is looked at independently
-        'starttime'                     \-\- order by time
-        'BEFORE*.MARKET+.CLOSE+.AFTER*' \-\- at least one event during each of MARKET and CLOSE, but gather up the rest
-        'BEFORE:=starttimestamp::time >= ''0:00:00'' and starttimestamp::time < ''9:30:00''::time,
-         MARKET:=starttimestamp::time >= ''9:30:00'' and starttimestamp::time < ''16:00:00''::time,
-         CLOSE:= starttimestamp::time <= ''16:00:00'' and starttimestamp::time < ''16:30:00''::time,
-         AFTER:= starttimestamp::time <= ''16:30:00'' and starttimestamp::time < ''24:00:00''::time
-        ',
-        'first(startdate) as starttime, array_agg(id) as all_users, count(*) as num_matches'
+        'starttime::date, topic, portfolio',  \-\- each day of activity is looked at independently
+        'starttime',                    \-\- order by time
+        'BEFORE:=starttime::time >= ''0:00:00'' and starttime::time < ''9:30:00''::time,'
+            'MARKET:=starttime::time >= ''9:30:00'' and starttime::time < ''16:00:00''::time,'
+            'CLOSE:= starttime::time <= ''16:00:00'' and starttime::time < ''16:30:00''::time,'
+            'AFTER:= starttime::time <= ''16:30:00'' and starttime::time < ''24:00:00''::time',
+        'BEFORE*.MARKET+.CLOSE+.AFTER*', \-\- at least one event during each of MARKET and CLOSE, but gather up the rest
+        'count(*) as num_matches'
     )
 </pre>
 */
@@ -202,11 +193,11 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.path(
     output_table        VARCHAR,
     partition_expr      VARCHAR,
     order_expr          VARCHAR,
-    pattern_expr        VARCHAR,
     symbol_expr         VARCHAR,
+    pattern_expr        VARCHAR,
     agg_func            VARCHAR,
     persist_rows        BOOLEAN
-) RETURNS void AS $$
+) RETURNS TEXT AS $$
 PythonFunction(utilities, path, path)
 $$ LANGUAGE plpythonu
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -219,10 +210,10 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.path(
     output_table        VARCHAR,
     partition_expr      VARCHAR,
     order_expr          VARCHAR,
-    pattern_expr        VARCHAR,
     symbol_expr         VARCHAR,
+    pattern_expr        VARCHAR,
     agg_func            VARCHAR
-) RETURNS void AS $$
+) RETURNS TEXT AS $$
     SELECT MADLIB_SCHEMA.path($1, $2, $3, $4, $5, $6, $7, FALSE)
 $$ LANGUAGE SQL
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/utilities/test/path.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/test/path.sql_in b/src/ports/postgres/modules/utilities/test/path.sql_in
index 44d89c8..c568938 100644
--- a/src/ports/postgres/modules/utilities/test/path.sql_in
+++ b/src/ports/postgres/modules/utilities/test/path.sql_in
@@ -84,8 +84,8 @@ SELECT madlib.path(
      '"Path_output"',         -- Table name to store the path results
      'user_id',             -- Partition expression to group the data table
      'event_timestamp ASC',         -- Order expression to sort the tuples of the data table
-     'I(click){1}(CONV){1}',        -- Definition of the path pattern to search for
      'I:=click_event=0 AND purchase_event=0, Click:=click_event=1 AND purchase_event=0, Conv:=purchase_event=1',    -- Definition of various symbols used in the pattern definition
+     'I(click){1}(CONV){1}',        -- Definition of the path pattern to search for
      'COUNT(*)'             -- Aggregate/window functions to be applied on the matched paths
     ,TRUE
      );

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/utilities/validate_args.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/validate_args.py_in b/src/ports/postgres/modules/utilities/validate_args.py_in
index f3591d7..7681489 100644
--- a/src/ports/postgres/modules/utilities/validate_args.py_in
+++ b/src/ports/postgres/modules/utilities/validate_args.py_in
@@ -1,6 +1,9 @@
+
 import plpy
-from utilities import __mad_version
 import re
+import string
+from utilities import __mad_version
+
 
 version_wrapper = __mad_version()
 _string_to_array = version_wrapper.select_vecfunc()
@@ -24,7 +27,7 @@ DELETE, or UPDATE statements).
 """
 
 
-def _unquote_name(input_str):
+def unquote_ident(input_str):
     """
     Returns input_str with starting and trailing double quotes stripped
 
@@ -51,6 +54,42 @@ def _unquote_name(input_str):
 # -------------------------------------------------------------------------
 
 
+def quote_ident(input_str):
+    """
+    Returns input_str with quotes added per Postgres identifier rules.
+
+    This function is available via plpy.quote_ident in PG > 9.1. We add this
+    function for compatibility with Greenplum and HAWQ.
+
+    If the input_str is a lower case string with characters in [a-z0-9_] then the
+    string is returned as is, else a double quote is added in front and back of the string.
+    Every double quote in the original string is preceeded by another double qoute.
+
+    Note: we don't check for SQL keywords. plpy.quote_ident is a better alternative
+    when available.
+
+    Args:
+        @param input_str
+
+    Returns:
+        String
+    """
+
+    def quote_not_needed(ch):
+        return (ch in string.ascii_lowercase or ch in string.digits or ch == '_')
+
+    if input_str:
+        input_str = input_str.strip()
+        if all(quote_not_needed(c) for c in input_str):
+            return input_str
+        else:
+            # if input_str has double quotes then each double quote
+            # is prependend with a double quote
+            # (the 1st double quote is used to escape the 2nd double quote)
+            return '"' + re.sub(r'"', r'""', input_str) + '"'
+# -------------------------------------------------------------------------
+
+
 def _get_table_schema_names(tbl, only_first_schema=False):
     """
     Returns a pair containing a set of schema names and the table name from
@@ -80,12 +119,12 @@ def _get_table_schema_names(tbl, only_first_schema=False):
         else:
             all_schemas = _string_to_array(plpy.execute(
                 "SELECT current_schemas(True) ""AS cs")[0]["cs"])
-        schema_str = "('{0}')".format("','".join(_unquote_name(s)
+        schema_str = "('{0}')".format("','".join(unquote_ident(s)
                                                  for s in all_schemas))
-        table = _unquote_name(names[0])
+        table = unquote_ident(names[0])
     elif len(names) == 2:
-        schema_str = "('" + _unquote_name(names[0]) + "')"
-        table = _unquote_name(names[1])
+        schema_str = "('" + unquote_ident(names[0]) + "')"
+        table = unquote_ident(names[1])
     else:
         plpy.error("Incorrect table name ({0}) provided! Table name "
                    "should be of the form: <schema name>.<table name>".format(tbl))
@@ -199,7 +238,7 @@ def get_first_schema(table_name):
         raise TypeError("Incorrect table name ({0}) provided! Table name should be "
                         "of the form: <schema name>.<table name>".format(table_name))
     elif len(names) == 2:
-        return _unquote_name(names[0])
+        return unquote_ident(names[0])
 
     # create a list of schema names in search path
     # _string_to_array is used for GPDB versions less than 4.2 where an array
@@ -342,9 +381,9 @@ def columns_exist_in_table(tbl, cols, schema_madlib="madlib"):
     Returns:
         True if all columns in 'cols' exist in source table else False
     """
-    existing_cols = set(_unquote_name(i) for i in get_cols(tbl, schema_madlib))
+    existing_cols = set(unquote_ident(i) for i in get_cols(tbl, schema_madlib))
     for col in cols:
-        if not col or _unquote_name(col) not in existing_cols:
+        if not col or unquote_ident(col) not in existing_cols:
             return False
     return True
 # -------------------------------------------------------------------------
@@ -362,11 +401,11 @@ def columns_missing_from_table(tbl, cols):
     """
     if not cols:
         return []
-    existing_cols = set(_unquote_name(i) for i in get_cols(tbl))
+    existing_cols = set(unquote_ident(i) for i in get_cols(tbl))
     # column is considered missing if the name is invalid (None or empty) or
     #  if the column is not present in the table
     return [col for col in cols
-            if not col or _unquote_name(col) not in existing_cols]
+            if not col or unquote_ident(col) not in existing_cols]
 # -------------------------------------------------------------------------
 
 
@@ -387,7 +426,7 @@ def is_col_array(tbl, col):
         plpy.error("Input error: Invalid table {0}".format(tbl))
     if not col:
         plpy.error("Input error: Invalid column name {0}".format(col))
-    col = _unquote_name(col)
+    col = unquote_ident(col)
 
     data_type_list = plpy.execute(
         """
@@ -563,10 +602,10 @@ class TestValidateFunctions(unittest.TestCase):
                          _get_table_schema_names('test_schema.test_table'))
         self.assertEqual(('"test_schema"', '"test_table"'),
                          _get_table_schema_names('"test_schema"."test_table"'))
-        self.assertEqual('Test', _unquote_name('"Test"'))
-        self.assertEqual('test', _unquote_name('Test'))
-        self.assertEqual('Test123', _unquote_name('"Test123"'))
-        self.assertEqual('test', _unquote_name('"test"'))
+        self.assertEqual('Test', unquote_ident('"Test"'))
+        self.assertEqual('test', unquote_ident('Test'))
+        self.assertEqual('Test123', unquote_ident('"Test123"'))
+        self.assertEqual('test', unquote_ident('"test"'))
 
 
 if __name__ == '__main__':