You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by je...@apache.org on 2022/06/06 03:31:16 UTC

[incubator-hugegraph] 02/05: implement for other backends

This is an automated email from the ASF dual-hosted git repository.

jermy pushed a commit to branch schema-update-bug-rebase
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit ce5da349f3fd29cbfbcd19d487631055e5cb37fc
Author: Zhangmei Li <li...@baidu.com>
AuthorDate: Mon Feb 28 20:11:00 2022 +0800

    implement for other backends
    
    Change-Id: Iab9ab42ba0504664ce3320c8905b2d92d01d849a
---
 .../backend/store/cassandra/CassandraStore.java    | 54 +++++++++++++---------
 .../backend/store/cassandra/CassandraTable.java    | 14 ++++++
 .../hugegraph/backend/store/BackendTable.java      |  7 ++-
 .../hugegraph/backend/store/hbase/HbaseStore.java  |  6 +++
 .../hugegraph/backend/store/hbase/HbaseTable.java  |  8 ++++
 .../hugegraph/backend/store/mysql/MysqlStore.java  |  6 +++
 .../hugegraph/backend/store/mysql/MysqlTable.java  | 18 +++++++-
 7 files changed, 85 insertions(+), 28 deletions(-)

diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java
index 75bf12f0c..147664312 100644
--- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java
+++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java
@@ -227,17 +227,25 @@ public abstract class CassandraStore
             LOG.warn("The entry will be ignored due to no change: {}", entry);
         }
 
+        CassandraTable table;
+        if (!entry.olap()) {
+            // Oltp table
+            table = this.table(entry.type());
+        } else {
+            if (entry.type().isIndex()) {
+                // Olap index
+                table = this.table(this.olapTableName(entry.type()));
+            } else {
+                // Olap vertex
+                table = this.table(this.olapTableName(entry.subId()));
+            }
+        }
+
         switch (item.action()) {
             case INSERT:
-                // Insert olap vertex
-                if (entry.olap()) {
-                    this.table(this.olapTableName(entry.subId()))
-                        .insert(session, entry.row());
-                    break;
-                }
                 // Insert entry
                 if (entry.selfChanged()) {
-                    this.table(entry.type()).insert(session, entry.row());
+                    table.insert(session, entry.row());
                 }
                 // Insert sub rows (edges)
                 for (CassandraBackendEntry.Row row : entry.subRows()) {
@@ -245,15 +253,9 @@ public abstract class CassandraStore
                 }
                 break;
             case DELETE:
-                // Delete olap vertex index by index label
-                if (entry.olap()) {
-                    this.table(this.olapTableName(entry.type()))
-                        .delete(session, entry.row());
-                    break;
-                }
                 // Delete entry
                 if (entry.selfChanged()) {
-                    this.table(entry.type()).delete(session, entry.row());
+                    table.delete(session, entry.row());
                 }
                 // Delete sub rows (edges)
                 for (CassandraBackendEntry.Row row : entry.subRows()) {
@@ -261,15 +263,9 @@ public abstract class CassandraStore
                 }
                 break;
             case APPEND:
-                // Append olap vertex index
-                if (entry.olap()) {
-                    this.table(this.olapTableName(entry.type()))
-                        .append(session, entry.row());
-                    break;
-                }
                 // Append entry
                 if (entry.selfChanged()) {
-                    this.table(entry.type()).append(session, entry.row());
+                    table.append(session, entry.row());
                 }
                 // Append sub rows (edges)
                 for (CassandraBackendEntry.Row row : entry.subRows()) {
@@ -279,13 +275,27 @@ public abstract class CassandraStore
             case ELIMINATE:
                 // Eliminate entry
                 if (entry.selfChanged()) {
-                    this.table(entry.type()).eliminate(session, entry.row());
+                    table.eliminate(session, entry.row());
                 }
                 // Eliminate sub rows (edges)
                 for (CassandraBackendEntry.Row row : entry.subRows()) {
                     this.table(row.type()).eliminate(session, row);
                 }
                 break;
+            case UPDATE_IF_PRESENT:
+                if (entry.selfChanged()) {
+                    // TODO: forward to master-writer node
+                    table.updateIfPresent(session, entry.row());
+                }
+                assert entry.subRows().isEmpty() : entry.subRows();
+                break;
+            case UPDATE_IF_ABSENT:
+                if (entry.selfChanged()) {
+                    // TODO: forward to master-writer node
+                    table.updateIfAbsent(session, entry.row());
+                }
+                assert entry.subRows().isEmpty() : entry.subRows();
+                break;
             default:
                 throw new AssertionError(String.format(
                           "Unsupported mutate action: %s", item.action()));
diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java
index 548726b4a..f9d65fe0f 100644
--- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java
+++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java
@@ -38,6 +38,7 @@ import com.baidu.hugegraph.backend.page.PageState;
 import com.baidu.hugegraph.backend.query.Aggregate;
 import com.baidu.hugegraph.backend.query.Condition;
 import com.baidu.hugegraph.backend.query.Condition.Relation;
+import com.baidu.hugegraph.backend.query.IdQuery;
 import com.baidu.hugegraph.backend.query.Query;
 import com.baidu.hugegraph.backend.query.Query.Order;
 import com.baidu.hugegraph.backend.store.BackendEntry;
@@ -46,6 +47,7 @@ import com.baidu.hugegraph.backend.store.Shard;
 import com.baidu.hugegraph.exception.NotFoundException;
 import com.baidu.hugegraph.exception.NotSupportException;
 import com.baidu.hugegraph.iterator.ExtendableIterator;
+import com.baidu.hugegraph.iterator.WrappedIterator;
 import com.baidu.hugegraph.type.HugeType;
 import com.baidu.hugegraph.type.define.HugeKeys;
 import com.baidu.hugegraph.util.CopyUtil;
@@ -97,6 +99,18 @@ public abstract class CassandraTable
         });
     }
 
+    @Override
+    public boolean queryExist(CassandraSessionPool.Session session,
+                              CassandraBackendEntry.Row entry) {
+        Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id());
+        Iterator<BackendEntry> iter = this.query(session, query);
+        try {
+            return iter.hasNext();
+        } finally {
+            WrappedIterator.close(iter);
+        }
+    }
+
     @Override
     public Number queryNumber(CassandraSessionPool.Session session,
                               Query query) {
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java
index f15f97dc7..9753076eb 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java
@@ -69,6 +69,7 @@ public abstract class BackendTable<Session extends BackendSession, Entry> {
     public void updateIfPresent(Session session, Entry entry) {
         // TODO: use fine-grained row lock
         synchronized (this.table) {
+            assert !session.hasChanges();
             if (this.queryExist(session, entry)) {
                 this.insert(session, entry);
                 if (session != null) {
@@ -81,6 +82,7 @@ public abstract class BackendTable<Session extends BackendSession, Entry> {
     public void updateIfAbsent(Session session, Entry entry) {
         // TODO: use fine-grained row lock
         synchronized (this.table) {
+            assert !session.hasChanges();
             if (!this.queryExist(session, entry)) {
                 this.insert(session, entry);
                 if (session != null) {
@@ -136,10 +138,7 @@ public abstract class BackendTable<Session extends BackendSession, Entry> {
 
     public abstract Number queryNumber(Session session, Query query);
 
-//    public abstract boolean queryExist(Session session, Entry entry);
-    public boolean queryExist(Session session, Entry entry) {
-        return false;
-    }
+    public abstract boolean queryExist(Session session, Entry entry);
 
     public abstract void insert(Session session, Entry entry);
 
diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java
index 628048a5d..9cd5c233a 100644
--- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java
+++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java
@@ -215,6 +215,12 @@ public abstract class HbaseStore extends AbstractBackendStore<Session> {
             case ELIMINATE:
                 table.eliminate(session, entry);
                 break;
+            case UPDATE_IF_PRESENT:
+                table.updateIfPresent(session, entry);
+                break;
+            case UPDATE_IF_ABSENT:
+                table.updateIfAbsent(session, entry);
+                break;
             default:
                 throw new AssertionError(String.format(
                           "Unsupported mutate action: %s", item.action()));
diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java
index 3cca9a5dc..b4009dd9c 100644
--- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java
+++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java
@@ -144,6 +144,14 @@ public class HbaseTable extends BackendTable<Session, BackendEntry> {
         this.delete(session, entry);
     }
 
+    @Override
+    public boolean queryExist(Session session, BackendEntry entry) {
+        Id id = entry.id();
+        try (RowIterator iter = this.queryById(session, id)) {
+            return iter.hasNext();
+        }
+    }
+
     @Override
     public Number queryNumber(Session session, Query query) {
         Aggregate aggregate = query.aggregateNotNull();
diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java
index 858b22edb..90318c8db 100644
--- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java
+++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java
@@ -260,6 +260,12 @@ public abstract class MysqlStore extends AbstractBackendStore<Session> {
             case ELIMINATE:
                 table.eliminate(session, entry.row());
                 break;
+            case UPDATE_IF_PRESENT:
+                table.updateIfPresent(session, entry.row());
+                break;
+            case UPDATE_IF_ABSENT:
+                table.updateIfAbsent(session, entry.row());
+                break;
             default:
                 throw new AssertionError(String.format(
                           "Unsupported mutate action: %s", item.action()));
diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java
index f872b6f56..727513334 100644
--- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java
+++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java
@@ -39,6 +39,7 @@ import com.baidu.hugegraph.backend.page.PageState;
 import com.baidu.hugegraph.backend.query.Aggregate;
 import com.baidu.hugegraph.backend.query.Condition;
 import com.baidu.hugegraph.backend.query.ConditionQuery;
+import com.baidu.hugegraph.backend.query.IdQuery;
 import com.baidu.hugegraph.backend.query.Query;
 import com.baidu.hugegraph.backend.store.BackendEntry;
 import com.baidu.hugegraph.backend.store.BackendTable;
@@ -48,6 +49,8 @@ import com.baidu.hugegraph.backend.store.mysql.MysqlEntryIterator.PagePosition;
 import com.baidu.hugegraph.backend.store.mysql.MysqlSessions.Session;
 import com.baidu.hugegraph.exception.NotFoundException;
 import com.baidu.hugegraph.iterator.ExtendableIterator;
+import com.baidu.hugegraph.iterator.WrappedIterator;
+import com.baidu.hugegraph.type.HugeType;
 import com.baidu.hugegraph.type.define.HugeKeys;
 import com.baidu.hugegraph.util.E;
 import com.baidu.hugegraph.util.Log;
@@ -328,6 +331,17 @@ public abstract class MysqlTable
         this.delete(session, entry);
     }
 
+    @Override
+    public boolean queryExist(Session session, MysqlBackendEntry.Row entry) {
+        Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id());
+        Iterator<BackendEntry> iter = this.query(session, query);
+        try {
+            return iter.hasNext();
+        } finally {
+            WrappedIterator.close(iter);
+        }
+    }
+
     @Override
     public Number queryNumber(Session session, Query query) {
         Aggregate aggregate = query.aggregateNotNull();
@@ -353,8 +367,8 @@ public abstract class MysqlTable
     }
 
     protected <R> Iterator<R> query(Session session, Query query,
-                                    BiFunction<Query, ResultSetWrapper, Iterator<R>>
-                                    parser) {
+                                    BiFunction<Query, ResultSetWrapper,
+                                               Iterator<R>> parser) {
         ExtendableIterator<R> rs = new ExtendableIterator<>();
 
         if (query.limit() == 0L && !query.noLimit()) {