You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by je...@apache.org on 2022/06/06 03:31:16 UTC
[incubator-hugegraph] 02/05: implement for other backends
This is an automated email from the ASF dual-hosted git repository.
jermy pushed a commit to branch schema-update-bug-rebase
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit ce5da349f3fd29cbfbcd19d487631055e5cb37fc
Author: Zhangmei Li <li...@baidu.com>
AuthorDate: Mon Feb 28 20:11:00 2022 +0800
implement for other backends
Change-Id: Iab9ab42ba0504664ce3320c8905b2d92d01d849a
---
.../backend/store/cassandra/CassandraStore.java | 54 +++++++++++++---------
.../backend/store/cassandra/CassandraTable.java | 14 ++++++
.../hugegraph/backend/store/BackendTable.java | 7 ++-
.../hugegraph/backend/store/hbase/HbaseStore.java | 6 +++
.../hugegraph/backend/store/hbase/HbaseTable.java | 8 ++++
.../hugegraph/backend/store/mysql/MysqlStore.java | 6 +++
.../hugegraph/backend/store/mysql/MysqlTable.java | 18 +++++++-
7 files changed, 85 insertions(+), 28 deletions(-)
diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java
index 75bf12f0c..147664312 100644
--- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java
+++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java
@@ -227,17 +227,25 @@ public abstract class CassandraStore
LOG.warn("The entry will be ignored due to no change: {}", entry);
}
+ CassandraTable table;
+ if (!entry.olap()) {
+ // Oltp table
+ table = this.table(entry.type());
+ } else {
+ if (entry.type().isIndex()) {
+ // Olap index
+ table = this.table(this.olapTableName(entry.type()));
+ } else {
+ // Olap vertex
+ table = this.table(this.olapTableName(entry.subId()));
+ }
+ }
+
switch (item.action()) {
case INSERT:
- // Insert olap vertex
- if (entry.olap()) {
- this.table(this.olapTableName(entry.subId()))
- .insert(session, entry.row());
- break;
- }
// Insert entry
if (entry.selfChanged()) {
- this.table(entry.type()).insert(session, entry.row());
+ table.insert(session, entry.row());
}
// Insert sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
@@ -245,15 +253,9 @@ public abstract class CassandraStore
}
break;
case DELETE:
- // Delete olap vertex index by index label
- if (entry.olap()) {
- this.table(this.olapTableName(entry.type()))
- .delete(session, entry.row());
- break;
- }
// Delete entry
if (entry.selfChanged()) {
- this.table(entry.type()).delete(session, entry.row());
+ table.delete(session, entry.row());
}
// Delete sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
@@ -261,15 +263,9 @@ public abstract class CassandraStore
}
break;
case APPEND:
- // Append olap vertex index
- if (entry.olap()) {
- this.table(this.olapTableName(entry.type()))
- .append(session, entry.row());
- break;
- }
// Append entry
if (entry.selfChanged()) {
- this.table(entry.type()).append(session, entry.row());
+ table.append(session, entry.row());
}
// Append sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
@@ -279,13 +275,27 @@ public abstract class CassandraStore
case ELIMINATE:
// Eliminate entry
if (entry.selfChanged()) {
- this.table(entry.type()).eliminate(session, entry.row());
+ table.eliminate(session, entry.row());
}
// Eliminate sub rows (edges)
for (CassandraBackendEntry.Row row : entry.subRows()) {
this.table(row.type()).eliminate(session, row);
}
break;
+ case UPDATE_IF_PRESENT:
+ if (entry.selfChanged()) {
+ // TODO: forward to master-writer node
+ table.updateIfPresent(session, entry.row());
+ }
+ assert entry.subRows().isEmpty() : entry.subRows();
+ break;
+ case UPDATE_IF_ABSENT:
+ if (entry.selfChanged()) {
+ // TODO: forward to master-writer node
+ table.updateIfAbsent(session, entry.row());
+ }
+ assert entry.subRows().isEmpty() : entry.subRows();
+ break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java
index 548726b4a..f9d65fe0f 100644
--- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java
+++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java
@@ -38,6 +38,7 @@ import com.baidu.hugegraph.backend.page.PageState;
import com.baidu.hugegraph.backend.query.Aggregate;
import com.baidu.hugegraph.backend.query.Condition;
import com.baidu.hugegraph.backend.query.Condition.Relation;
+import com.baidu.hugegraph.backend.query.IdQuery;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.query.Query.Order;
import com.baidu.hugegraph.backend.store.BackendEntry;
@@ -46,6 +47,7 @@ import com.baidu.hugegraph.backend.store.Shard;
import com.baidu.hugegraph.exception.NotFoundException;
import com.baidu.hugegraph.exception.NotSupportException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
+import com.baidu.hugegraph.iterator.WrappedIterator;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.CopyUtil;
@@ -97,6 +99,18 @@ public abstract class CassandraTable
});
}
+ @Override
+ public boolean queryExist(CassandraSessionPool.Session session,
+ CassandraBackendEntry.Row entry) {
+ Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id());
+ Iterator<BackendEntry> iter = this.query(session, query);
+ try {
+ return iter.hasNext();
+ } finally {
+ WrappedIterator.close(iter);
+ }
+ }
+
@Override
public Number queryNumber(CassandraSessionPool.Session session,
Query query) {
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java
index f15f97dc7..9753076eb 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java
@@ -69,6 +69,7 @@ public abstract class BackendTable<Session extends BackendSession, Entry> {
public void updateIfPresent(Session session, Entry entry) {
// TODO: use fine-grained row lock
synchronized (this.table) {
+ assert !session.hasChanges();
if (this.queryExist(session, entry)) {
this.insert(session, entry);
if (session != null) {
@@ -81,6 +82,7 @@ public abstract class BackendTable<Session extends BackendSession, Entry> {
public void updateIfAbsent(Session session, Entry entry) {
// TODO: use fine-grained row lock
synchronized (this.table) {
+ assert !session.hasChanges();
if (!this.queryExist(session, entry)) {
this.insert(session, entry);
if (session != null) {
@@ -136,10 +138,7 @@ public abstract class BackendTable<Session extends BackendSession, Entry> {
public abstract Number queryNumber(Session session, Query query);
-// public abstract boolean queryExist(Session session, Entry entry);
- public boolean queryExist(Session session, Entry entry) {
- return false;
- }
+ public abstract boolean queryExist(Session session, Entry entry);
public abstract void insert(Session session, Entry entry);
diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java
index 628048a5d..9cd5c233a 100644
--- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java
+++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java
@@ -215,6 +215,12 @@ public abstract class HbaseStore extends AbstractBackendStore<Session> {
case ELIMINATE:
table.eliminate(session, entry);
break;
+ case UPDATE_IF_PRESENT:
+ table.updateIfPresent(session, entry);
+ break;
+ case UPDATE_IF_ABSENT:
+ table.updateIfAbsent(session, entry);
+ break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java
index 3cca9a5dc..b4009dd9c 100644
--- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java
+++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java
@@ -144,6 +144,14 @@ public class HbaseTable extends BackendTable<Session, BackendEntry> {
this.delete(session, entry);
}
+ @Override
+ public boolean queryExist(Session session, BackendEntry entry) {
+ Id id = entry.id();
+ try (RowIterator iter = this.queryById(session, id)) {
+ return iter.hasNext();
+ }
+ }
+
@Override
public Number queryNumber(Session session, Query query) {
Aggregate aggregate = query.aggregateNotNull();
diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java
index 858b22edb..90318c8db 100644
--- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java
+++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java
@@ -260,6 +260,12 @@ public abstract class MysqlStore extends AbstractBackendStore<Session> {
case ELIMINATE:
table.eliminate(session, entry.row());
break;
+ case UPDATE_IF_PRESENT:
+ table.updateIfPresent(session, entry.row());
+ break;
+ case UPDATE_IF_ABSENT:
+ table.updateIfAbsent(session, entry.row());
+ break;
default:
throw new AssertionError(String.format(
"Unsupported mutate action: %s", item.action()));
diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java
index f872b6f56..727513334 100644
--- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java
+++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java
@@ -39,6 +39,7 @@ import com.baidu.hugegraph.backend.page.PageState;
import com.baidu.hugegraph.backend.query.Aggregate;
import com.baidu.hugegraph.backend.query.Condition;
import com.baidu.hugegraph.backend.query.ConditionQuery;
+import com.baidu.hugegraph.backend.query.IdQuery;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendTable;
@@ -48,6 +49,8 @@ import com.baidu.hugegraph.backend.store.mysql.MysqlEntryIterator.PagePosition;
import com.baidu.hugegraph.backend.store.mysql.MysqlSessions.Session;
import com.baidu.hugegraph.exception.NotFoundException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
+import com.baidu.hugegraph.iterator.WrappedIterator;
+import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
@@ -328,6 +331,17 @@ public abstract class MysqlTable
this.delete(session, entry);
}
+ @Override
+ public boolean queryExist(Session session, MysqlBackendEntry.Row entry) {
+ Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id());
+ Iterator<BackendEntry> iter = this.query(session, query);
+ try {
+ return iter.hasNext();
+ } finally {
+ WrappedIterator.close(iter);
+ }
+ }
+
@Override
public Number queryNumber(Session session, Query query) {
Aggregate aggregate = query.aggregateNotNull();
@@ -353,8 +367,8 @@ public abstract class MysqlTable
}
protected <R> Iterator<R> query(Session session, Query query,
- BiFunction<Query, ResultSetWrapper, Iterator<R>>
- parser) {
+ BiFunction<Query, ResultSetWrapper,
+ Iterator<R>> parser) {
ExtendableIterator<R> rs = new ExtendableIterator<>();
if (query.limit() == 0L && !query.noLimit()) {