You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@age.apache.org by jg...@apache.org on 2022/07/07 16:25:16 UTC

[age] branch master updated: Fix MERGE visibility in chained commands, SET specifically.

This is an automated email from the ASF dual-hosted git repository.

jgemignani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/age.git


The following commit(s) were added to refs/heads/master by this push:
     new 99e7c62  Fix MERGE visibility in chained commands, SET specifically.
99e7c62 is described below

commit 99e7c625d97782849e39bca93daca07b35c60772
Author: John Gemignani <jr...@gmail.com>
AuthorDate: Wed Jul 6 16:26:14 2022 -0700

    Fix MERGE visibility in chained commands, SET specifically.
    
    Background:
    
    The MERGE command searches for a pattern to return. If that pattern
    isn't found, it will create it and return the newly created pattern.
    In both cases, MERGE will return one or more tuples.
    
    When MERGE is chained with other commands, the tuples generated
    (either found or created) by MERGE are passed up to the parent
    commands. Those parent commands can then make modifications to
    those tuples, if necessary.
    
    Issue:
    
    The issue here was that the newly created tuples were not visible,
    meaning the currentCommandId used to create them was not strictly
    less than the currentCommandId used to update them further in the
    chain.
    
    The error with the processed tuples would show up on the surface (the
    return value) as being correct. However, after inspection, it would
    be shown that the tuple wasn't actually modified by any of the chained
    commands. Note, this was only the case for newly created tuples.
    
    merge (n:node {name: 'Jason'}) SET n.name = 'Lisa' RETURN n;
                                            n
    ----------------------------------------------------------------------------------
     {"id": 844424930131970, "label": "node", "properties": {"name": "Lisa"}}::vertex
    (1 row)
    
    match (n) return n;
                                            n
    ----------------------------------------------------------------------------------
     {"id": 844424930131970, "label": "node", "properties": {"name": "Jason"}}::vertex
    (1 row)
    
    To fix this, the currentCommandId needed to be incremented after creating
    a tuple in MERGE. This would allow the chained commands that follow, to
    see it.  However, the currentCommandId used by the MERGE still needed to
    remain the same. This made it necessary to create a field in the custom
    scan node to hold the original currentCommandId for the MERGE instances'
    updates. What this does is to always keep the newly created MERGE tuples
    updated with a currentCommandId that is always 1 less than the currrent
    currentCommandId.
    
    Doing this corrected the issue.
    
    merge (n:node {name: 'Jason'}) SET n.name = 'Lisa' RETURN n;
                                            n
    ----------------------------------------------------------------------------------
     {"id": 844424930131971, "label": "node", "properties": {"name": "Lisa"}}::vertex
    (1 row)
    
    match (n) return n;
    ----------------------------------------------------------------------------------
     {"id": 844424930131971, "label": "node", "properties": {"name": "Lisa"}}::vertex
    (1 row)
    
    Added regression tests.
---
 regress/expected/cypher_merge.out   | 49 +++++++++++++++++++++++++-
 regress/sql/cypher_merge.sql        | 10 +++++-
 src/backend/executor/cypher_merge.c | 70 +++++++++++++++++++++++++++++--------
 src/backend/executor/cypher_utils.c | 33 ++++++++++++-----
 src/include/executor/cypher_utils.h |  4 +++
 5 files changed, 142 insertions(+), 24 deletions(-)

diff --git a/regress/expected/cypher_merge.out b/regress/expected/cypher_merge.out
index 901cbb0..389c34d 100644
--- a/regress/expected/cypher_merge.out
+++ b/regress/expected/cypher_merge.out
@@ -836,7 +836,54 @@ SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype
 (1 row)
 
 -- Node doesn't exist, is created, then set
--- to be added -jrg
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) DELETE n $$) AS (n agtype);
+ n 
+---
+(0 rows)
+
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
+ n 
+---
+(0 rows)
+
+SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.name = 'Lisa' RETURN n $$) AS (n agtype);
+                                         n                                         
+-----------------------------------------------------------------------------------
+ {"id": 2533274790395906, "label": "node", "properties": {"name": "Lisa"}}::vertex
+(1 row)
+
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
+                                         n                                         
+-----------------------------------------------------------------------------------
+ {"id": 2533274790395906, "label": "node", "properties": {"name": "Lisa"}}::vertex
+(1 row)
+
+-- Multiple SETs
+SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Lisa'}) SET n.age = 23, n.gender = "Female" RETURN n $$) AS (n agtype);
+                                                        n                                                         
+------------------------------------------------------------------------------------------------------------------
+ {"id": 2533274790395906, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
+(1 row)
+
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
+                                                        n                                                         
+------------------------------------------------------------------------------------------------------------------
+ {"id": 2533274790395906, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
+(1 row)
+
+SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.name = 'Lisa', n.age = 23, n.gender = 'Female' RETURN n $$) AS (n agtype);
+                                                        n                                                         
+------------------------------------------------------------------------------------------------------------------
+ {"id": 2533274790395907, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
+(1 row)
+
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
+                                                        n                                                         
+------------------------------------------------------------------------------------------------------------------
+ {"id": 2533274790395906, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
+ {"id": 2533274790395907, "label": "node", "properties": {"age": 23, "name": "Lisa", "gender": "Female"}}::vertex
+(2 rows)
+
 --clean up
 SELECT * FROM cypher('cypher_merge', $$MATCH (n) DETACH DELETE n $$) AS (a agtype);
  a 
diff --git a/regress/sql/cypher_merge.sql b/regress/sql/cypher_merge.sql
index cfead8f..6cd4207 100644
--- a/regress/sql/cypher_merge.sql
+++ b/regress/sql/cypher_merge.sql
@@ -469,7 +469,15 @@ SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.nam
 SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
 
 -- Node doesn't exist, is created, then set
--- to be added -jrg
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) DELETE n $$) AS (n agtype);
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
+SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.name = 'Lisa' RETURN n $$) AS (n agtype);
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
+-- Multiple SETs
+SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Lisa'}) SET n.age = 23, n.gender = "Female" RETURN n $$) AS (n agtype);
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
+SELECT * FROM cypher('cypher_merge', $$ MERGE (n:node {name: 'Jason'}) SET n.name = 'Lisa', n.age = 23, n.gender = 'Female' RETURN n $$) AS (n agtype);
+SELECT * FROM cypher('cypher_merge', $$ MATCH (n:node) RETURN n $$) AS (n agtype);
 
 --clean up
 SELECT * FROM cypher('cypher_merge', $$MATCH (n) DETACH DELETE n $$) AS (a agtype);
diff --git a/src/backend/executor/cypher_merge.c b/src/backend/executor/cypher_merge.c
index a409572..c65abf2 100644
--- a/src/backend/executor/cypher_merge.c
+++ b/src/backend/executor/cypher_merge.c
@@ -152,8 +152,6 @@ static void begin_cypher_merge(CustomScanState *node, EState *estate,
             cypher_node->prop_expr_state =
                 ExecInitExpr(cypher_node->prop_expr, (PlanState *)node);
         }
-
-
     }
 
     /*
@@ -166,6 +164,9 @@ static void begin_cypher_merge(CustomScanState *node, EState *estate,
     if (estate->es_output_cid == 0)
         estate->es_output_cid = estate->es_snapshot->curcid;
 
+    /* store the currentCommandId for this instance */
+    css->base_currentCommandId = GetCurrentCommandId(false);
+
     Increment_Estate_CommandId(estate);
 }
 
@@ -656,18 +657,57 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
 
         ExecClearTuple(elemTupleSlot);
 
-        // get the next graphid for this vertex.
+        /* get the next graphid for this vertex */
         id = ExecEvalExpr(node->id_expr_state, econtext, &isNull);
         elemTupleSlot->tts_values[vertex_tuple_id] = id;
         elemTupleSlot->tts_isnull[vertex_tuple_id] = isNull;
 
-        // get the properties for this vertex
+        /* get the properties for this vertex */
         prop = ExecEvalExpr(node->prop_expr_state, econtext, &isNull);
         elemTupleSlot->tts_values[vertex_tuple_properties] = prop;
         elemTupleSlot->tts_isnull[vertex_tuple_properties] = isNull;
 
-        // Insert the new vertex
-        insert_entity_tuple(resultRelInfo, elemTupleSlot, estate);
+        /*
+         * Insert the new vertex.
+         *
+         * Depending on the currentCommandId, we need to do this one of two
+         * different ways -
+         *
+         * 1) If they are equal, the currentCommandId hasn't been used for an
+         *    update, or it hasn't been incremented after being used. In either
+         *    case, we need to use the current one and then increment it so that
+         *    the following commands will have visibility of this update. Note,
+         *    it isn't our job to update the currentCommandId first and then do
+         *    this check.
+         *
+         * 2) If they are not equal, the currentCommandId has been used and/or
+         *    updated. In this case, we can't use it. Otherwise our update won't
+         *    be visible to anything that follows, until the currentCommandId is
+         *    updated again. Remember, visibility is, greater than but not equal
+         *    to, the currentCommandID used for the update. So, in this case we
+         *    need to use the original currentCommandId when begin_cypher_merge
+         *    was initiated as everything under this instance of merge needs to
+         *    be based off of that initial currentCommandId. This allows the
+         *    following command to see the updates generated by this instance of
+         *    merge.
+         */
+        if (css->base_currentCommandId == GetCurrentCommandId(false))
+        {
+            insert_entity_tuple(resultRelInfo, elemTupleSlot, estate);
+
+            /*
+             * Increment the currentCommandId since we processed an update. We
+             * don't want to do this outside of this block because we don't want
+             * to inadvertently or unnecessarily update the commandCounterId of
+             * another command.
+             */
+            CommandCounterIncrement();
+        }
+        else
+        {
+            insert_entity_tuple_cid(resultRelInfo, elemTupleSlot, estate,
+                                    css->base_currentCommandId);
+        }
 
         /* restore the old result relation info */
         estate->es_result_relation_info = old_estate_es_result_relation_info;
@@ -682,11 +722,11 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
         {
             Datum result;
 
-            // make the vertex agtype
+            /* make the vertex agtype */
             result = make_vertex(
                 id, CStringGetDatum(node->label_name), prop);
 
-            // append to the path list
+            /* append to the path list */
             if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
             {
                 css->path_values = lappend(css->path_values,
@@ -724,11 +764,11 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
                  node->variable_name)));
         }
 
-        // get the vertex agtype in the scanTupleSlot
+        /* get the vertex agtype in the scanTupleSlot */
         d = scantuple->tts_values[node->tuple_position - 1];
         a = DATUM_GET_AGTYPE_P(d);
 
-        // Convert to an agtype value
+        /* Convert to an agtype value */
         v = get_ith_agtype_value_from_container(&a->root, 0);
 
         if (v->type != AGTV_VERTEX)
@@ -736,10 +776,10 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                      errmsg("agtype must resolve to a vertex")));
 
-        // extract the id agtype field
+        /* extract the id agtype field */
         id_value = GET_AGTYPE_VALUE_OBJECT_VALUE(v, "id");
 
-        // extract the graphid and cast to a Datum
+        /* extract the graphid and cast to a Datum */
         id = GRAPHID_GET_DATUM(id_value->val.int_value);
 
         /*
@@ -764,7 +804,9 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
             }
         }
 
-        // add the Datum to the list of entities for creating the path variable
+        /*
+         * Add the Datum to the list of entities for creating the path variable
+         */
         if (CYPHER_TARGET_NODE_IN_PATH(node->flags))
         {
             Datum vertex = scanTupleSlot->tts_values[node->tuple_position - 1];
@@ -773,7 +815,7 @@ static Datum merge_vertex(cypher_merge_custom_scan_state *css,
         }
     }
 
-    // If the path continues, create the next edge, passing the vertex's id.
+    /* If the path continues, create the next edge, passing the vertex's id. */
     if (next != NULL)
     {
         merge_edge(css, lfirst(next), id, lnext(next));
diff --git a/src/backend/executor/cypher_utils.c b/src/backend/executor/cypher_utils.c
index 8884ea1..3558fe6 100644
--- a/src/backend/executor/cypher_utils.c
+++ b/src/backend/executor/cypher_utils.c
@@ -207,30 +207,47 @@ bool entity_exists(EState *estate, Oid graph_oid, graphid id)
 }
 
 /*
- * Insert the edge/vertex tuple into the table and indices. If the table's
- * constraints have not been violated.
+ * Insert the edge/vertex tuple into the table and indices. Check that the
+ * table's constraints have not been violated.
+ *
+ * This function defaults to, and flags for update, the currentCommandId. If
+ * you need to pass a specific cid and avoid using the currentCommandId, use
+ * insert_entity_tuple_cid instead.
  */
 HeapTuple insert_entity_tuple(ResultRelInfo *resultRelInfo,
                               TupleTableSlot *elemTupleSlot,
                               EState *estate)
 {
-    HeapTuple tuple;
+    return insert_entity_tuple_cid(resultRelInfo, elemTupleSlot, estate,
+                                   GetCurrentCommandId(true));
+}
+
+/*
+ * Insert the edge/vertex tuple into the table and indices. Check that the
+ * table's constraints have not been violated.
+ *
+ * This function uses the passed cid for updates.
+ */
+HeapTuple insert_entity_tuple_cid(ResultRelInfo *resultRelInfo,
+                                  TupleTableSlot *elemTupleSlot,
+                                  EState *estate, CommandId cid)
+{
+    HeapTuple tuple = NULL;
 
     ExecStoreVirtualTuple(elemTupleSlot);
     tuple = ExecMaterializeSlot(elemTupleSlot);
 
-    // Check the constraints of the tuple
+    /* Check the constraints of the tuple */
     tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     if (resultRelInfo->ri_RelationDesc->rd_att->constr != NULL)
     {
         ExecConstraints(resultRelInfo, elemTupleSlot, estate);
     }
 
-    // Insert the tuple normally
-    heap_insert(resultRelInfo->ri_RelationDesc, tuple,
-                GetCurrentCommandId(true), 0, NULL);
+    /* Insert the tuple using the passed in cid */
+    heap_insert(resultRelInfo->ri_RelationDesc, tuple, cid, 0, NULL);
 
-    // Insert index entries for the tuple
+    /* Insert index entries for the tuple */
     if (resultRelInfo->ri_NumIndices > 0)
     {
         ExecInsertIndexTuples(elemTupleSlot, &(tuple->t_self), estate, false,
diff --git a/src/include/executor/cypher_utils.h b/src/include/executor/cypher_utils.h
index 55c3c1e..4d19937 100644
--- a/src/include/executor/cypher_utils.h
+++ b/src/include/executor/cypher_utils.h
@@ -87,6 +87,7 @@ typedef struct cypher_merge_custom_scan_state
     AttrNumber merge_function_attr;
     bool created_new_path;
     bool found_a_path;
+    CommandId base_currentCommandId;
 } cypher_merge_custom_scan_state;
 
 TupleTableSlot *populate_vertex_tts(TupleTableSlot *elemTupleSlot,
@@ -103,5 +104,8 @@ bool entity_exists(EState *estate, Oid graph_oid, graphid id);
 HeapTuple insert_entity_tuple(ResultRelInfo *resultRelInfo,
                               TupleTableSlot *elemTupleSlot,
                               EState *estate);
+HeapTuple insert_entity_tuple_cid(ResultRelInfo *resultRelInfo,
+                                  TupleTableSlot *elemTupleSlot,
+                                  EState *estate, CommandId cid);
 
 #endif