You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@madlib.apache.org by xt...@apache.org on 2016/02/25 22:27:24 UTC
incubator-madlib git commit: Path: Allow multiple matches in single
partition
Repository: incubator-madlib
Updated Branches:
refs/heads/master 06f1cb893 -> 33c0a72c6
Path: Allow multiple matches in single partition
JIRA: MADLIB-917
Path query was rewritten to use array functions. This was necessary
since multiple matches produced a set of matches for each group. This
set value has to be unnest - this was not possible in GPDB since the
GROUP BY placed the executor in a context that does not allow SRF.
Solution was to avoid the unnest by using array functions for all
computations and finally unnesting without the GROUP BY.
Closes #21
Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/33c0a72c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/33c0a72c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/33c0a72c
Branch: refs/heads/master
Commit: 33c0a72c69e9c986067eddad99c8977c7874e9f9
Parents: 06f1cb8
Author: Rahul Iyer <ri...@pivotal.io>
Authored: Thu Dec 24 14:15:36 2015 -0800
Committer: Xiaocheng Tang <xi...@gmail.com>
Committed: Thu Feb 25 13:26:26 2016 -0800
----------------------------------------------------------------------
doc/mainpage.dox.in | 2 +-
methods/array_ops/src/pg_gp/array_ops.c | 136 +++++++++
methods/array_ops/src/pg_gp/array_ops.sql_in | 28 ++
.../recursive_partitioning/decision_tree.py_in | 8 +-
.../postgres/modules/regress/margins.py_in | 2 -
src/ports/postgres/modules/utilities/path.py_in | 280 ++++++++++++++-----
.../postgres/modules/utilities/path.sql_in | 71 ++---
.../postgres/modules/utilities/test/path.sql_in | 2 +-
.../modules/utilities/validate_args.py_in | 71 +++--
9 files changed, 461 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/doc/mainpage.dox.in
----------------------------------------------------------------------
diff --git a/doc/mainpage.dox.in b/doc/mainpage.dox.in
index 3f31f5e..59475fd 100644
--- a/doc/mainpage.dox.in
+++ b/doc/mainpage.dox.in
@@ -233,7 +233,7 @@ Interface and implementation are subject to change.
@defgroup grp_cg Conjugate Gradient
@defgroup grp_bayes Naive Bayes Classification
- @defgroup grp_path Pathing functions
+ @defgroup grp_path Path functions
@defgroup grp_sample Random Sampling
@defgroup grp_kernmach Support Vector Machines
@}
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/methods/array_ops/src/pg_gp/array_ops.c
----------------------------------------------------------------------
diff --git a/methods/array_ops/src/pg_gp/array_ops.c b/methods/array_ops/src/pg_gp/array_ops.c
index 57ee3bf..598df85 100644
--- a/methods/array_ops/src/pg_gp/array_ops.c
+++ b/methods/array_ops/src/pg_gp/array_ops.c
@@ -47,6 +47,8 @@ static ArrayType *General_2Array_to_Array(ArrayType *v1, ArrayType *v2,
Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid));
static ArrayType *General_Array_to_Array(ArrayType *v1, Datum value,
Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid));
+static ArrayType *General_Array_to_Cumulative_Array(ArrayType *v1, Datum value,
+ Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid));
static Datum General_2Array_to_Element(ArrayType *v1, ArrayType *v2,
Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid),
Datum(*finalize_function)(Datum,int,Oid));
@@ -491,6 +493,7 @@ array_sum(PG_FUNCTION_ARGS){
return float8_datum_cast(DatumGetFloat8(res), element_type);
}
+
/*
* This function returns sum of the array elements' absolute value.
*/
@@ -893,6 +896,36 @@ array_scalar_add(PG_FUNCTION_ARGS){
}
/*
+ * This function returns the cumulative sum of the array elements.
+ */
+PG_FUNCTION_INFO_V1(array_cum_sum);
+Datum
+array_cum_sum(PG_FUNCTION_ARGS){
+ if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); }
+
+ ArrayType *v = PG_GETARG_ARRAYTYPE_P(0);
+ ArrayType *res = General_Array_to_Cumulative_Array(v, Float8GetDatum(0.0), element_add);
+
+ PG_FREE_IF_COPY(v, 0);
+ PG_RETURN_ARRAYTYPE_P(res);
+}
+
+/*
+ * This function returns the cumulative product of the array elements.
+ */
+PG_FUNCTION_INFO_V1(array_cum_prod);
+Datum
+array_cum_prod(PG_FUNCTION_ARGS){
+ if (PG_ARGISNULL(0)) { PG_RETURN_NULL(); }
+
+ ArrayType *v = PG_GETARG_ARRAYTYPE_P(0);
+ ArrayType *res = General_Array_to_Cumulative_Array(v, Float8GetDatum(1.0), element_mult);
+
+ PG_FREE_IF_COPY(v, 0);
+ PG_RETURN_ARRAYTYPE_P(res);
+}
+
+/*
* This function removes elements with specified value from an array.
*/
PG_FUNCTION_INFO_V1(array_filter);
@@ -1970,3 +2003,106 @@ General_Array_to_Array(
return pgarray;
}
+
+
+/*
+ * @brief Transforms an array to another array using cumulative operations.
+ *
+ * @param v1 Array.
+ * @param initial Parameter.
+ * @param element_function Map function.
+ * @returns Transformed array.
+ */
+ArrayType*
+General_Array_to_Cumulative_Array(
+ ArrayType *v1,
+ Datum initial,
+ Datum(*element_function)(Datum,Oid,Datum,Oid,Datum,Oid)) {
+
+ // dimensions
+ int ndims1 = ARR_NDIM(v1);
+ if (ndims1 == 0) {
+ elog(WARNING, "input are empty arrays.");
+ return v1;
+ }
+ int ndims = ndims1;
+ int *lbs1 = ARR_LBOUND(v1);
+ int *dims1 = ARR_DIMS(v1);
+ int *dims = (int *) palloc(ndims * sizeof(int));
+ int *lbs = (int *) palloc(ndims * sizeof(int));
+ int i = 0;
+ for (i = 0; i < ndims; i ++) {
+ dims[i] = dims1[i];
+ lbs[i] = lbs1[i];
+ }
+ int nitems = ArrayGetNItems(ndims, dims);
+
+ // nulls
+ if (ARR_HASNULL(v1)) {
+ ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("arrays cannot contain nulls"),
+ errdetail("Arrays with element value NULL are not allowed.")));
+ }
+
+ // type
+ Oid element_type = ARR_ELEMTYPE(v1);
+ TypeCacheEntry *typentry = lookup_type_cache(element_type,TYPECACHE_CMP_PROC_FINFO);
+ int type_size = typentry->typlen;
+ bool typbyval = typentry->typbyval;
+ char typalign = typentry->typalign;
+
+ // allocate
+ Datum *result = NULL;
+ switch (element_type) {
+ case INT2OID:
+ case INT4OID:
+ case INT8OID:
+ case FLOAT4OID:
+ case FLOAT8OID:
+ case NUMERICOID:
+ result = (Datum *)palloc(nitems * sizeof(Datum));break;
+ default:
+ ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("type is not supported"),
+ errdetail("Arrays with element type %s are not supported.",
+ format_type_be(element_type))));
+ break;
+ }
+
+ // iterate
+ Datum *resultp = result;
+ char *dat1 = ARR_DATA_PTR(v1);
+ float prev_dat = DatumGetFloat8(initial);
+ for (i = 0; i < nitems; i ++) {
+ // iterate elt1
+ Datum elt1 = fetch_att(dat1, typbyval, type_size);
+ dat1 = att_addlength_pointer(dat1, type_size, dat1);
+ dat1 = (char *) att_align_nominal(dat1, typalign);
+
+ *resultp = element_function(elt1,
+ element_type,
+ elt1, /* placeholder */
+ element_type, /* placeholder */
+ prev_dat,
+ element_type);
+ prev_dat = *resultp;
+ resultp++;
+ }
+
+ // construct return result
+ ArrayType *pgarray = construct_md_array(result,
+ NULL,
+ ndims,
+ dims,
+ lbs,
+ element_type,
+ type_size,
+ typbyval,
+ typalign);
+
+ pfree(result);
+ pfree(dims);
+ pfree(lbs);
+
+ return pgarray;
+}
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/methods/array_ops/src/pg_gp/array_ops.sql_in
----------------------------------------------------------------------
diff --git a/methods/array_ops/src/pg_gp/array_ops.sql_in b/methods/array_ops/src/pg_gp/array_ops.sql_in
index 80f50ab..6179f26 100644
--- a/methods/array_ops/src/pg_gp/array_ops.sql_in
+++ b/methods/array_ops/src/pg_gp/array_ops.sql_in
@@ -620,3 +620,31 @@ DROP FUNCTION IF EXISTS MADLIB_SCHEMA.array_contains_null(float8[]) CASCADE;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.array_contains_null(x anyarray) RETURNS BOOLEAN
AS 'MODULE_PATHNAME', 'array_contains_null' LANGUAGE C IMMUTABLE STRICT
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+/**
+ * @brief This function takes an array as the input and computes the
+ * cumulative sum with the first element being the same.
+ * It requires that all the values are NON-NULL.
+ *
+ * @param x Array x
+ * @returns Cumulative sum of the elements in x.
+ *
+ */
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.array_cum_sum(x anyarray) RETURNS anyarray
+AS 'MODULE_PATHNAME', 'array_cum_sum'
+LANGUAGE C IMMUTABLE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+/**
+ * @brief This function takes an array as the input and computes the
+ * cumulative product with the first element being the same.
+ * It requires that all the values are NON-NULL.
+ *
+ * @param x Array x
+ * @returns Cumulative product of the elements in x.
+ *
+ */
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.array_cum_prod(x anyarray) RETURNS anyarray
+AS 'MODULE_PATHNAME', 'array_cum_prod'
+LANGUAGE C IMMUTABLE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in b/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in
index 6fc8689..4d15e43 100644
--- a/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in
+++ b/src/ports/postgres/modules/recursive_partitioning/decision_tree.py_in
@@ -26,7 +26,7 @@ from utilities.validate_args import table_exists
from utilities.validate_args import table_is_empty
from utilities.validate_args import columns_exist_in_table
from utilities.validate_args import is_var_valid
-from utilities.validate_args import _unquote_name
+from utilities.validate_args import unquote_ident
from utilities.utilities import _assert
from utilities.utilities import extract_keyvalue_params
from utilities.utilities import unique_string
@@ -113,12 +113,12 @@ def _get_features_to_use(schema_madlib, training_table_name,
# list.
if grouping_cols:
group_set = set(split_quoted_delimited_str(grouping_cols))
- group_set |= set(_unquote_name(i)
+ group_set |= set(unquote_ident(i)
for i in split_quoted_delimited_str(grouping_cols))
else:
group_set = set()
other_col_set = set([id_col_name, weights, dependent_variable])
- other_col_set |= set(_unquote_name(i)
+ other_col_set |= set(unquote_ident(i)
for i in [id_col_name, weights, dependent_variable])
if list_of_features.strip() == '*':
@@ -139,7 +139,7 @@ def _get_features_to_use(schema_madlib, training_table_name,
def _get_col_value(input_dict, col_name):
"""Return value from dict where key could be quoted or unquoted name"""
return input_dict.get(
- col_name, input_dict.get(_unquote_name(col_name)))
+ col_name, input_dict.get(unquote_ident(col_name)))
# -------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/regress/margins.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/regress/margins.py_in b/src/ports/postgres/modules/regress/margins.py_in
index 6858f7a..15c5f9d 100644
--- a/src/ports/postgres/modules/regress/margins.py_in
+++ b/src/ports/postgres/modules/regress/margins.py_in
@@ -23,8 +23,6 @@ from margins_builder import TermBase
import re
#------------------------------------------------------------------------------
-m4_changequote(`<!', `!>')
-
def margins(schema_madlib, model_table, out_table, x_design=None,
source_table=None, marginal_vars=None, *args, **kwargs):
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/utilities/path.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/path.py_in b/src/ports/postgres/modules/utilities/path.py_in
index 44b4d81..67abf64 100644
--- a/src/ports/postgres/modules/utilities/path.py_in
+++ b/src/ports/postgres/modules/utilities/path.py_in
@@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,15 +27,20 @@ import shlex
import string
import re
+from control import MinWarning
from utilities import unique_string
from utilities import _assert
from utilities import add_postfix
from validate_args import get_cols
from validate_args import input_tbl_valid
from validate_args import output_tbl_valid
+from validate_args import quote_ident
# ------------------------------------------------------------------------
+m4_changequote(`<!', `!>')
+
+
def path(schema_madlib, source_table, output_table, partition_expr,
order_expr, pattern_expr, symbol_expr, agg_func,
persist_rows=False, **kwargs):
@@ -53,82 +58,201 @@ def path(schema_madlib, source_table, output_table, partition_expr,
@param agg_func: str, List of the result functions/aggregates to apply on matched patterns
"""
- if not partition_expr:
- partition_expr = "1 = 1"
- as_partition_expr = "1 = 1 as ALL"
- else:
- as_partition_expr = partition_expr
- _validate(source_table, output_table, partition_expr, order_expr,
- pattern_expr, symbol_expr, agg_func, persist_rows)
- # replace each occurence of the original symbol with the new
- # perform this operation in descending order of length to avoid substituting
- # subset of any symbol
- sym_mapping, sym_str = _parse_symbol_str(symbol_expr)
- old_sym_desc = list(reversed(sorted(sym_mapping.keys(), key=len)))
- replace_pattern = re.compile('|'.join(old_sym_desc), re.IGNORECASE)
- pattern_expr = replace_pattern.sub(
- lambda m: sym_mapping[re.escape(string.lower(m.group(0)))],
- pattern_expr)
- input_with_id = unique_string('input_with_id')
- matched_view = unique_string('matched_view')
- matched_rows = add_postfix(output_table, "_tuples") if persist_rows else unique_string('matched_rows')
- table_or_view = 'TABLE' if persist_rows else 'VIEW'
- id_col_name = unique_string('id_col')
- seq_gen = unique_string('seq_gen')
- all_input_cols_str = ', '.join(get_cols(source_table, schema_madlib))
-
- # build a new input temp table that contains a sequence
- plpy.execute("CREATE SEQUENCE " + seq_gen)
- plpy.execute("""
- CREATE TEMP TABLE {input_with_id} AS (
- SELECT
- *,
- nextval('{seq_gen}') AS {id_col_name},
- CASE
- {sym_str}
- END AS symbol
- FROM {source_table}
- )""".format(**locals()))
- build_matched_rows = """
- CREATE {table_or_view} {matched_rows} AS (
+ with MinWarning("error"):
+ if not partition_expr:
+ partition_expr = "1 = 1"
+ as_partition_expr = "1 = 1 as all"
+ else:
+ as_partition_expr = partition_expr
+ _validate(source_table, output_table, partition_expr, order_expr,
+ pattern_expr, symbol_expr, agg_func, persist_rows)
+
+ # replace each occurence of the original symbol with the new
+ # perform this operation in descending order of length to avoid substituting
+ # subset of any symbol
+ sym_mapping, sym_str = _parse_symbol_str(symbol_expr)
+ old_sym_desc = list(reversed(sorted(sym_mapping.keys(), key=len)))
+ replace_pattern = re.compile('|'.join(old_sym_desc), re.IGNORECASE)
+ pattern_expr = replace_pattern.sub(
+ lambda m: sym_mapping[re.escape(string.lower(m.group(0)))],
+ pattern_expr)
+
+ # build variables for intermediate objects
+ input_with_id = unique_string('input_with_id')
+ matched_view = unique_string('matched_view')
+ id_col_name = unique_string('id_col')
+ matched_partitions = unique_string('matched_partitions')
+ seq_gen = unique_string('seq_gen')
+ symbol_name_str = unique_string('symbol')
+ all_input_cols_str = ', '.join(get_cols(source_table, schema_madlib))
+ if persist_rows:
+ matched_rows = add_postfix(output_table, "_tuples")
+ table_or_view = 'TABLE'
+ else:
+ matched_rows = unique_string('matched_rows')
+ table_or_view = 'VIEW'
+
+ # build a new input temp table that contains a sequence and partition columns
+ split_p_cols = [i.strip() for i in partition_expr.split(',')]
+ p_col_names = [unique_string() for i in split_p_cols]
+ p_col_as_str = ','.join(
+ [i + " AS " + j for i, j in zip(split_p_cols, p_col_names)])
+ p_col_name_str = ', '.join(p_col_names)
+ distribution = m4_ifdef(<!__POSTGRESQL__!>, <!''!>,
+ <!"DISTRIBUTED BY ({0})".format(p_col_name_str)!>)
+ plpy.execute("CREATE SEQUENCE " + seq_gen)
+ plpy.execute("""
+ CREATE TEMP TABLE {input_with_id} AS
+ SELECT
+ {p_col_as_str},
+ *,
+ nextval('{seq_gen}') AS {id_col_name},
+ CASE
+ {sym_str}
+ END AS {symbol_name_str}
+ FROM {source_table}
+ {distribution}
+ """.format(**locals()))
+ # Explanation for computing the path matches:
+ # Match is performed using regular expression pattern matching on a
+ # string produced by concatenating the symbols. The exact rows that
+ # produce the match are identified by correlating the matched string
+ # indices with another array containing row ids.
+ #
+ # matched_partitions: For each partition (group), concatenate all symbols
+ # into a single string (sym_str). Keep corresponding ids in an array in the
+ # same order as the symbols. This is performed only for partitions
+ # that contain a match.
+ # build_multiple_matched_rows:
+ # q1: Split sym_str into an array containing the lengths of the
+ # strings between the matches.
+ # q2: Store lengths of matches into an array
+ # q3: Merge q1 and q2 and unnest the arrays (ensuring same length).
+ # Also right shift the matches array.
+ # q4: Compute the cumulative sum of the arrays.
+ plpy.execute("""
+ CREATE TABLE {matched_partitions} AS
+ SELECT
+ {p_col_name_str},
+ array_to_string(array_agg({symbol_name_str} ORDER BY {order_expr}), '') as sym_str,
+ array_agg({id_col_name} ORDER BY {order_expr}) as matched_ids
+ FROM {input_with_id}
+ GROUP BY {p_col_name_str}
+ HAVING array_to_string(array_agg({symbol_name_str} ORDER BY {order_expr}), '') ~* '{pattern_expr}'
+ """.format(**locals()))
+ build_multiple_matched_rows = """
+ CREATE {table_or_view} {matched_rows} AS
SELECT {all_input_cols_str}
FROM
- {input_with_id} as q1,
+ {input_with_id} as source,
(
- SELECT
- unnest(match_ids[match_position+1:match_position+match_length]) as match_ids
- FROM(
+ SELECT
+ unnest(matched_ids[l:r]) AS matched_ids
+ FROM
+ (
SELECT
- length(substring(array_to_string(
- array_agg(symbol ORDER BY {order_expr}), '')
- FROM '(?i)(^.*){pattern_expr}')) as match_position,
- length(substring(array_to_string(
- array_agg(symbol ORDER BY {order_expr}), '')
- FROM '(?i)^.*({pattern_expr}).*$')) as match_length,
- array_agg({id_col_name} ORDER BY {order_expr}) as match_ids
- FROM {input_with_id}
- GROUP BY {partition_expr}
- HAVING
- array_to_string(array_agg(symbol ORDER BY {order_expr}), '')
- ~* '{pattern_expr}'
- ) subq
- ) q2
- WHERE q1.{id_col_name} = q2.match_ids
- );
- """.format(**locals())
- plpy.execute(build_matched_rows)
- plpy.execute("""
- CREATE TABLE {output_table} AS
- SELECT
- {as_partition_expr},
- {agg_func}
- FROM {matched_rows}
- GROUP BY {partition_expr}
- """.format(**locals()))
- if not persist_rows:
- plpy.execute("DROP VIEW IF EXISTS " + matched_rows)
- plpy.execute("DROP TABLE IF EXISTS " + input_with_id)
- plpy.execute("DROP SEQUENCE IF EXISTS " + seq_gen)
+ matched_ids,
+ unnest(left_range) AS l,
+ unnest(right_range) AS r
+ FROM (
+ SELECT
+ {p_col_name_str},
+ matched_ids,
+ {m}.array_cum_sum({m}.array_add(match_splits, prev_matches)) AS left_range,
+ {m}.array_cum_sum({m}.array_add(match_splits, matches)) AS right_range
+ FROM (
+ SELECT
+ {p_col_name_str}, q1.matched_ids,
+ match_splits[1:array_upper(matches, 1)] AS match_splits,
+ matches AS matches,
+ array_prepend(1, matches[1:array_upper(matches, 1) - 1]) AS prev_matches
+ FROM (
+ -- lengths of text between matches
+ SELECT
+ {p_col_name_str},
+ matched_ids,
+ array_agg(length(match_splits) ORDER BY match_index) AS match_splits
+ FROM (
+ SELECT
+ {p_col_name_str},
+ matched_ids,
+ generate_series(1, array_upper(match_splits, 1)) AS match_index,
+ unnest(match_splits) AS match_splits
+ FROM (
+ SELECT
+ {p_col_name_str},
+ matched_ids,
+ regexp_split_to_array(sym_str, '(?i){pattern_expr}') AS match_splits
+ FROM
+ {matched_partitions}
+ ) ssubq1
+ ) subq1
+ GROUP BY {p_col_name_str}, matched_ids
+ ) q1
+ JOIN
+ (
+ -- lengths of all matches
+ SELECT
+ {p_col_name_str},
+ matched_ids,
+ array_agg(length(matches) ORDER BY match_index) AS matches
+ FROM (
+ SELECT
+ {p_col_name_str},
+ matched_ids,
+ generate_series(1, num_matches) AS match_index,
+ (regexp_matches(
+ sym_str, '(?i)({pattern_expr})', 'g'))[1] AS matches
+ FROM (
+ SELECT
+ {p_col_name_str},
+ matched_ids,
+ sym_str,
+ count(matches) AS num_matches
+ FROM (
+ SELECT
+ {p_col_name_str},
+ matched_ids,
+ sym_str,
+ (regexp_matches(
+ sym_str, '(?i)({pattern_expr})', 'g'))[1] AS matches
+ FROM
+ {matched_partitions}
+ ) t1
+ GROUP BY {p_col_name_str}, matched_ids, sym_str
+ ) t2
+ ) subq2
+ GROUP BY {p_col_name_str}, matched_ids
+ ) q2
+ USING ({p_col_name_str})
+ GROUP BY {p_col_name_str}, q1.matched_ids, match_splits, matches
+ ) q3
+ ) q4
+ ) q5
+ ) matched_rows
+ WHERE source.{id_col_name} = matched_rows.matched_ids
+ """.format(m=schema_madlib, **locals())
+ plpy.execute(build_multiple_matched_rows)
+ quoted_split_p_cols = [quote_ident(i) for i in split_p_cols]
+ p_col_orig_name_str = ','.join(
+ [i + " AS " + j for i, j in zip(p_col_names, quoted_split_p_cols)])
+
+ plpy.execute("""
+ CREATE TABLE {output_table} AS
+ SELECT
+ {as_partition_expr},
+ {agg_func}
+ FROM {matched_rows}
+ GROUP BY {partition_expr}
+ """.format(**locals()))
+ if not persist_rows:
+ plpy.execute("DROP VIEW IF EXISTS " + matched_rows)
+ plpy.execute("DROP TABLE IF EXISTS " + input_with_id)
+ plpy.execute("DROP SEQUENCE IF EXISTS " + seq_gen)
+ result = "Result available in table " + output_table
+ if persist_rows:
+ result += "\n Matched tuples can be found in table " + matched_rows
+ return result
# ------------------------------------------------------------------------------
@@ -139,6 +263,11 @@ def _validate(source_table, output_table, partition_expr, order_expr,
if persist_rows:
output_tbl_valid(add_postfix(output_table, "_tuples"), 'Path')
+ plpy.execute("""SELECT {partition_expr}
+ FROM {source_table}
+ ORDER BY {order_expr}
+ LIMIT 0
+ """.format(**locals()))
# ensure the expressions are not None or empty strings
_assert(partition_expr, "Path error: Invalid partition expression")
_assert(order_expr, "Path error: Invalid order expression")
@@ -168,6 +297,7 @@ def _parse_symbol_str(symbol_expr):
WHEN MARKET THEN start >= \'9:30:00\' and start < \'16:00:00\'
END"
"""
+
all_symbols = iter(string.ascii_lowercase + string.digits)
symbol_expr_parser = shlex.shlex(symbol_expr)
symbol_expr_parser.wordchars = [i for i in string.printable
@@ -217,8 +347,8 @@ SELECT {schema_madlib}.path(
'output_table', -- Table name to store the path results
'partition_expr', -- Partition expression to group the data table
'order_expr', -- Order expression to sort the tuples of the data table
- 'pattern_def', -- Definition of the path pattern to search for
'symbol_def', -- Definition of various symbols used in the pattern definition
+ 'pattern_def', -- Definition of the path pattern to search for
'agg_func', -- Aggregate/window functions to be applied on the matched paths
persist_rows -- Boolean indicating whether to output the matched rows
-- in an additional table (named <output_table>_tuples)
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/utilities/path.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/path.sql_in b/src/ports/postgres/modules/utilities/path.sql_in
index db06fb6..a538449 100644
--- a/src/ports/postgres/modules/utilities/path.sql_in
+++ b/src/ports/postgres/modules/utilities/path.sql_in
@@ -59,8 +59,8 @@ path(
output_table,
partition_expr,
order_expr,
- pattern,
symbol,
+ pattern,
aggregate_func,
persist_rows
)
@@ -72,19 +72,26 @@ path(
<dd>VARCHAR. Name of the source table, containing data for path analysis.</dd>
<dt>output_table</dt>
- <dd>VARCHAR. Name of the result table. This table contains columns predicated
- by the <em>result</em> argument (shown below).</dd>
+ <dd>VARCHAR. Name of the result table. This table contains columns
+ predicated by the <em>result</em> argument (shown below).</dd>
<dt>partition_expr</dt>
- <dd>VARCHAR. The 'partition_expr' can be a list of columns or expressions (separated by comma)
- to divide all rows into groups, or partitions, that share the same values of the
- partition expression(s). For each row, the matching is applied across the rows
- that fall into the same partition. This can be NULL or '' to indicate the matching
- is to be applied on the whole table.</dd>
+ <dd>VARCHAR. The 'partition_expr' can be a single column or a list of
+ comma-separated columns/expressions to divide all rows into groups,
+ or partitions, that share the same values of the partition expression(s).
+ The matching is applied across the rows that fall into the same partition.
+ This can be NULL or '' to indicate the matching is to be applied on the
+ whole table.</dd>
<dt>order_expr</dt>
- <dd>VARCHAR. This expression controls the order in which rows are processed or matched in a
- partition.
+ <dd>VARCHAR. This expression controls the order in which rows are
+ processed or matched in a partition.
+ </dd>
+
+ <dt>symbol</dt>
+ <dd>VARCHAR. A symbol represents a row of a particular type that you’re
+ searching for as part of a row sequence. In the SYMBOLS clause, you write a
+ predicate to define the type of row that matches the symbol.
</dd>
<dt>pattern</dt>
@@ -104,12 +111,6 @@ path(
</ul>
</dd>
- <dt>symbol</dt>
- <dd>VARCHAR. A symbol represents a row of a particular type that you’re searching
-for as part of a row sequence. In the SYMBOLS clause, you write a predicate to define the
-type of row that matches the symbol.
- </dd>
-
<dt>aggregate_func</dt>
<dd>VARCHAR. A comma-separated list of window functions and aggregates to be
applied on the matched window.
@@ -129,8 +130,8 @@ type of row that matches the symbol.
- Build sample dataset
<pre class="example">
-CREATE TABLE data (id integer, sessionid integer, starttime timestamp, topic varchar, portfolio integer);
-COPY data FROM STDIN DELIMITER AS '|';
+CREATE TABLE trades (id integer, sessionid integer, starttime timestamp, topic varchar, portfolio integer);
+COPY trades FROM STDIN DELIMITER AS '|';
1|1|1/01/00 5:00 AM|Real-time Equity pricing|769
1|2|1/01/00 5:30 AM|Real-time Index pricing|9898
1|3|1/01/00 6:00 AM|Real-time Index pricing|9898
@@ -166,15 +167,6 @@ COPY data FROM STDIN DELIMITER AS '|';
2|13|1/04/00 12:00 AM|Bond pricing|55
2|14|1/04/00 12:00 AM|Bond pricing|55
\\.
-
-CREATE TABLE trades AS
-SELECT *, starttimestamp::date startdate,
- CASE WHEN THEN 'before'
- WHEN THEN 'market'
- WHEN THEN 'close'
- WHEN THEN 'after'
- END tradingperiod
-FROM sessiontable
</pre>
- Compare within each day for the same Topic/Portfolio across every user
@@ -182,15 +174,14 @@ FROM sessiontable
SELECT madlib.path(
'trades',
'trades_out',
- 'startdate, topic, portfolio' \-\- each day of activity is looked at independently
- 'starttime' \-\- order by time
- 'BEFORE*.MARKET+.CLOSE+.AFTER*' \-\- at least one event during each of MARKET and CLOSE, but gather up the rest
- 'BEFORE:=starttimestamp::time >= ''0:00:00'' and starttimestamp::time < ''9:30:00''::time,
- MARKET:=starttimestamp::time >= ''9:30:00'' and starttimestamp::time < ''16:00:00''::time,
- CLOSE:= starttimestamp::time <= ''16:00:00'' and starttimestamp::time < ''16:30:00''::time,
- AFTER:= starttimestamp::time <= ''16:30:00'' and starttimestamp::time < ''24:00:00''::time
- ',
- 'first(startdate) as starttime, array_agg(id) as all_users, count(*) as num_matches'
+ 'starttime::date, topic, portfolio', \-\- each day of activity is looked at independently
+ 'starttime', \-\- order by time
+ 'BEFORE:=starttime::time >= ''0:00:00'' and starttime::time < ''9:30:00''::time,'
+ 'MARKET:=starttime::time >= ''9:30:00'' and starttime::time < ''16:00:00''::time,'
+ 'CLOSE:= starttime::time <= ''16:00:00'' and starttime::time < ''16:30:00''::time,'
+ 'AFTER:= starttime::time <= ''16:30:00'' and starttime::time < ''24:00:00''::time',
+ 'BEFORE*.MARKET+.CLOSE+.AFTER*', \-\- at least one event during each of MARKET and CLOSE, but gather up the rest
+ 'count(*) as num_matches'
)
</pre>
*/
@@ -202,11 +193,11 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.path(
output_table VARCHAR,
partition_expr VARCHAR,
order_expr VARCHAR,
- pattern_expr VARCHAR,
symbol_expr VARCHAR,
+ pattern_expr VARCHAR,
agg_func VARCHAR,
persist_rows BOOLEAN
-) RETURNS void AS $$
+) RETURNS TEXT AS $$
PythonFunction(utilities, path, path)
$$ LANGUAGE plpythonu
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -219,10 +210,10 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.path(
output_table VARCHAR,
partition_expr VARCHAR,
order_expr VARCHAR,
- pattern_expr VARCHAR,
symbol_expr VARCHAR,
+ pattern_expr VARCHAR,
agg_func VARCHAR
-) RETURNS void AS $$
+) RETURNS TEXT AS $$
SELECT MADLIB_SCHEMA.path($1, $2, $3, $4, $5, $6, $7, FALSE)
$$ LANGUAGE SQL
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/utilities/test/path.sql_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/test/path.sql_in b/src/ports/postgres/modules/utilities/test/path.sql_in
index 44d89c8..c568938 100644
--- a/src/ports/postgres/modules/utilities/test/path.sql_in
+++ b/src/ports/postgres/modules/utilities/test/path.sql_in
@@ -84,8 +84,8 @@ SELECT madlib.path(
'"Path_output"', -- Table name to store the path results
'user_id', -- Partition expression to group the data table
'event_timestamp ASC', -- Order expression to sort the tuples of the data table
- 'I(click){1}(CONV){1}', -- Definition of the path pattern to search for
'I:=click_event=0 AND purchase_event=0, Click:=click_event=1 AND purchase_event=0, Conv:=purchase_event=1', -- Definition of various symbols used in the pattern definition
+ 'I(click){1}(CONV){1}', -- Definition of the path pattern to search for
'COUNT(*)' -- Aggregate/window functions to be applied on the matched paths
,TRUE
);
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/33c0a72c/src/ports/postgres/modules/utilities/validate_args.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/validate_args.py_in b/src/ports/postgres/modules/utilities/validate_args.py_in
index f3591d7..7681489 100644
--- a/src/ports/postgres/modules/utilities/validate_args.py_in
+++ b/src/ports/postgres/modules/utilities/validate_args.py_in
@@ -1,6 +1,9 @@
+
import plpy
-from utilities import __mad_version
import re
+import string
+from utilities import __mad_version
+
version_wrapper = __mad_version()
_string_to_array = version_wrapper.select_vecfunc()
@@ -24,7 +27,7 @@ DELETE, or UPDATE statements).
"""
-def _unquote_name(input_str):
+def unquote_ident(input_str):
"""
Returns input_str with starting and trailing double quotes stripped
@@ -51,6 +54,42 @@ def _unquote_name(input_str):
# -------------------------------------------------------------------------
+def quote_ident(input_str):
+ """
+ Returns input_str with quotes added per Postgres identifier rules.
+
+ This function is available via plpy.quote_ident in PG > 9.1. We add this
+ function for compatibility with Greenplum and HAWQ.
+
+ If the input_str is a lower case string with characters in [a-z0-9_] then the
+ string is returned as is, else a double quote is added in front and back of the string.
+ Every double quote in the original string is preceeded by another double qoute.
+
+ Note: we don't check for SQL keywords. plpy.quote_ident is a better alternative
+ when available.
+
+ Args:
+ @param input_str
+
+ Returns:
+ String
+ """
+
+ def quote_not_needed(ch):
+ return (ch in string.ascii_lowercase or ch in string.digits or ch == '_')
+
+ if input_str:
+ input_str = input_str.strip()
+ if all(quote_not_needed(c) for c in input_str):
+ return input_str
+ else:
+ # if input_str has double quotes then each double quote
+ # is prependend with a double quote
+ # (the 1st double quote is used to escape the 2nd double quote)
+ return '"' + re.sub(r'"', r'""', input_str) + '"'
+# -------------------------------------------------------------------------
+
+
def _get_table_schema_names(tbl, only_first_schema=False):
"""
Returns a pair containing a set of schema names and the table name from
@@ -80,12 +119,12 @@ def _get_table_schema_names(tbl, only_first_schema=False):
else:
all_schemas = _string_to_array(plpy.execute(
"SELECT current_schemas(True) ""AS cs")[0]["cs"])
- schema_str = "('{0}')".format("','".join(_unquote_name(s)
+ schema_str = "('{0}')".format("','".join(unquote_ident(s)
for s in all_schemas))
- table = _unquote_name(names[0])
+ table = unquote_ident(names[0])
elif len(names) == 2:
- schema_str = "('" + _unquote_name(names[0]) + "')"
- table = _unquote_name(names[1])
+ schema_str = "('" + unquote_ident(names[0]) + "')"
+ table = unquote_ident(names[1])
else:
plpy.error("Incorrect table name ({0}) provided! Table name "
"should be of the form: <schema name>.<table name>".format(tbl))
@@ -199,7 +238,7 @@ def get_first_schema(table_name):
raise TypeError("Incorrect table name ({0}) provided! Table name should be "
"of the form: <schema name>.<table name>".format(table_name))
elif len(names) == 2:
- return _unquote_name(names[0])
+ return unquote_ident(names[0])
# create a list of schema names in search path
# _string_to_array is used for GPDB versions less than 4.2 where an array
@@ -342,9 +381,9 @@ def columns_exist_in_table(tbl, cols, schema_madlib="madlib"):
Returns:
True if all columns in 'cols' exist in source table else False
"""
- existing_cols = set(_unquote_name(i) for i in get_cols(tbl, schema_madlib))
+ existing_cols = set(unquote_ident(i) for i in get_cols(tbl, schema_madlib))
for col in cols:
- if not col or _unquote_name(col) not in existing_cols:
+ if not col or unquote_ident(col) not in existing_cols:
return False
return True
# -------------------------------------------------------------------------
@@ -362,11 +401,11 @@ def columns_missing_from_table(tbl, cols):
"""
if not cols:
return []
- existing_cols = set(_unquote_name(i) for i in get_cols(tbl))
+ existing_cols = set(unquote_ident(i) for i in get_cols(tbl))
# column is considered missing if the name is invalid (None or empty) or
# if the column is not present in the table
return [col for col in cols
- if not col or _unquote_name(col) not in existing_cols]
+ if not col or unquote_ident(col) not in existing_cols]
# -------------------------------------------------------------------------
@@ -387,7 +426,7 @@ def is_col_array(tbl, col):
plpy.error("Input error: Invalid table {0}".format(tbl))
if not col:
plpy.error("Input error: Invalid column name {0}".format(col))
- col = _unquote_name(col)
+ col = unquote_ident(col)
data_type_list = plpy.execute(
"""
@@ -563,10 +602,10 @@ class TestValidateFunctions(unittest.TestCase):
_get_table_schema_names('test_schema.test_table'))
self.assertEqual(('"test_schema"', '"test_table"'),
_get_table_schema_names('"test_schema"."test_table"'))
- self.assertEqual('Test', _unquote_name('"Test"'))
- self.assertEqual('test', _unquote_name('Test'))
- self.assertEqual('Test123', _unquote_name('"Test123"'))
- self.assertEqual('test', _unquote_name('"test"'))
+ self.assertEqual('Test', unquote_ident('"Test"'))
+ self.assertEqual('test', unquote_ident('Test'))
+ self.assertEqual('Test123', unquote_ident('"Test123"'))
+ self.assertEqual('test', unquote_ident('"test"'))
if __name__ == '__main__':