You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/08/02 12:06:06 UTC
[ignite-3] 05/05: Add transactions. Fix tests.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-15212
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 8612fe86381e1e506ef4e39680c862f5e8a0036a
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Mon Aug 2 15:05:27 2021 +0300
Add transactions.
Fix tests.
---
.../org/apache/ignite/query/sql/IgniteSql.java | 67 ++--------------
.../org/apache/ignite/query/sql/SqlColumnMeta.java | 5 +-
.../org/apache/ignite/query/sql/SqlResultSet.java | 5 ++
.../query/sql/{IgniteSql.java => SqlSession.java} | 42 ++++-------
modules/sql/src/test/java/SqlTest.java | 88 ++++++++++++++++------
5 files changed, 92 insertions(+), 115 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java b/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
index e5a1d3a..11c5947 100644
--- a/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
+++ b/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
@@ -18,74 +18,18 @@
package org.apache.ignite.query.sql;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.query.sql.reactive.ReactiveSqlResultSet;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
/**
* Ignite SQL query facade.
*/
// TODO: Do we wand a separate IgniteQuery facade for non-sql (index/scan/full-text) queries?
public interface IgniteSql {
- ////////////// Query execution.
/**
- * Shortcut method for running a query.
+ * Creates SQL session.
*
- * @param sql SQL query template.
- * @param args Arguments for template (optional).
- * @return SQL query resultset.
- * @throws SQLException If failed.
+ * @return Session.
*/
- SqlResultSet execute(@NotNull String sql, Object... args);
-
- /**
- * Shortcut method for running a query in asynchronously.
- *
- * @param sql SQL query template.
- * @param args Arguments for template (optional)
- * @return Query future.
- * @throws SQLException If failed.
- */
- CompletableFuture<SqlResultSet> executeAsync(String sql, Object... args);
- //TODO: May fut.cancel() cancels a query? If so, describe behavior in Javadoc.
- //TODO: Cassandra API offers pagination API here, for manual fetching control. Is their AsyncResultSet ever usefull?
-
- /**
- * Shortcut method for running a query in asynchronously.
- *
- * @param sql SQL query template.
- * @param args Arguments for template (optional)
- * @return Reactive result.
- * @throws SQLException If failed.
- */
- ReactiveSqlResultSet executeReactive(String sql, Object... args);
-
- /**
- * Shortcut method for running a non-query statement.
- *
- * @param sql SQL statement template.
- * @param args Agruments for template (optional).
- * @return Number of updated rows.
- */
- int executeNonQuery(@NotNull String sql, @Nullable Object... args);
- //TODO: useful for bulk DML query, when we don't care of results.
- //TODO: in contrary, execute() method may return inserted rows IDs that looks useful if AutoIncrement ID column is used.
-
- //TODO: same methods for Statement.
-
- /**
- * Sets query session parameter.
- *
- * @param name Parameter name.
- * @param value Parameter value.
- */
- void setParameter(String name, Object value);
- //TODO: User can set e.g. queryTimeout or force join order or whatever.
- //TODO: This is similar to SQL "SET" operator which is used in JDBC/ODBC clients for session state manipulation.
-
-
- //TODO: Move all of this to Session. Maybe facade instance could incapsulate a session implicitely?
+ SqlSession session();
/**
* Kills query by its' id.
@@ -96,7 +40,7 @@ public interface IgniteSql {
/**
* Returns statistics facade for table statistics management.
- *
+ * <p>
* Table statistics are used by SQL engine for SQL queries planning.
*
* @return Statistics facade.
@@ -104,9 +48,10 @@ public interface IgniteSql {
IgniteTableStatistics statistics();
// TODO: Do we need this here or move to Table facade?
-
void registerUserFunction(Class type, String... methodNames); //TODO: Get function details from method annotations.
+
void registerUserFunction(Class type);
+
void unregistedUserFunction(String functionName);
//TODO: Custom function registration. Do we need a view and unregister functionality?
}
diff --git a/modules/api/src/main/java/org/apache/ignite/query/sql/SqlColumnMeta.java b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlColumnMeta.java
index c7250a4..2576936 100644
--- a/modules/api/src/main/java/org/apache/ignite/query/sql/SqlColumnMeta.java
+++ b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlColumnMeta.java
@@ -31,16 +31,13 @@ public interface SqlColumnMeta {
* @return Column name.
*/
String name();
- //TODO: do we expect a CanonicalName here?
- //TODO: do we want a Table name for real column? Is Calcite supports this?
/**
* Returns column type.
*
* @return Column type.
*/
- ColumnType type();
- //TODO: do ever we want to expose ColumnType (NativeType) here? Is it useful or drop this?
+ ColumnType columnType();
/**
* Returns column value type.
diff --git a/modules/api/src/main/java/org/apache/ignite/query/sql/SqlResultSet.java b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlResultSet.java
index 2bbadcf..ea8003d 100644
--- a/modules/api/src/main/java/org/apache/ignite/query/sql/SqlResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlResultSet.java
@@ -33,5 +33,10 @@ public interface SqlResultSet extends Iterable<SqlRow>, AutoCloseable {
*/
SqlResultSetMeta metadata();
+ /**
+ * Returns query unique id.
+ *
+ * @return Query id.
+ */
UUID queryId();
}
diff --git a/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlSession.java
similarity index 73%
copy from modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
copy to modules/api/src/main/java/org/apache/ignite/query/sql/SqlSession.java
index e5a1d3a..d9795c7 100644
--- a/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
+++ b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlSession.java
@@ -17,18 +17,17 @@
package org.apache.ignite.query.sql;
-import java.util.UUID;
+import java.sql.PreparedStatement;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.query.sql.reactive.ReactiveSqlResultSet;
+import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Ignite SQL query facade.
+ * SQL session.
*/
-// TODO: Do we wand a separate IgniteQuery facade for non-sql (index/scan/full-text) queries?
-public interface IgniteSql {
- ////////////// Query execution.
+public interface SqlSession {
/**
* Shortcut method for running a query.
*
@@ -72,42 +71,29 @@ public interface IgniteSql {
//TODO: useful for bulk DML query, when we don't care of results.
//TODO: in contrary, execute() method may return inserted rows IDs that looks useful if AutoIncrement ID column is used.
- //TODO: same methods for Statement.
+ PreparedStatement preparedStatement(@NotNull String sql);
/**
* Sets query session parameter.
*
* @param name Parameter name.
* @param value Parameter value.
+ * @return {@code this} for chaining.
*/
- void setParameter(String name, Object value);
+ SqlSession setParameter(@NotNull String name, Object value);
//TODO: User can set e.g. queryTimeout or force join order or whatever.
//TODO: This is similar to SQL "SET" operator which is used in JDBC/ODBC clients for session state manipulation.
- //TODO: Move all of this to Session. Maybe facade instance could incapsulate a session implicitely?
+ SqlSession withTransaction(Transaction tx);
+ //TODO: What happens with session if TX will commited/rolledback? Tx link can prevent garbage from being collected by GC.
+ //TODO: Can it be shared?
+ //TODO: Move to PreparedStatement/SqlQuery level?
/**
- * Kills query by its' id.
+ * Returns current transaction.
*
- * @param queryID Query id.
+ * @return Current transaction or null if a table is not enlisted in a transaction.
*/
- void killQuery(UUID queryID);
-
- /**
- * Returns statistics facade for table statistics management.
- *
- * Table statistics are used by SQL engine for SQL queries planning.
- *
- * @return Statistics facade.
- */
- IgniteTableStatistics statistics();
- // TODO: Do we need this here or move to Table facade?
-
-
- void registerUserFunction(Class type, String... methodNames); //TODO: Get function details from method annotations.
- void registerUserFunction(Class type);
- void unregistedUserFunction(String functionName);
- //TODO: Custom function registration. Do we need a view and unregister functionality?
+ @Nullable Transaction transaction();
}
-
diff --git a/modules/sql/src/test/java/SqlTest.java b/modules/sql/src/test/java/SqlTest.java
index c49721a..503b1d8 100644
--- a/modules/sql/src/test/java/SqlTest.java
+++ b/modules/sql/src/test/java/SqlTest.java
@@ -23,8 +23,11 @@ import org.apache.ignite.query.sql.IgniteSql;
import org.apache.ignite.query.sql.SqlResultSet;
import org.apache.ignite.query.sql.SqlResultSetMeta;
import org.apache.ignite.query.sql.SqlRow;
+import org.apache.ignite.query.sql.SqlSession;
import org.apache.ignite.query.sql.reactive.ReactiveSqlResultSet;
import org.apache.ignite.schema.ColumnType;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -46,6 +49,12 @@ public class SqlTest {
@Mock
IgniteSql queryMgr;
+ @Mock
+ private IgniteTransactions igniteTx;
+
+ @Mock
+ private Transaction tx;
+
@BeforeEach
void setUp() {
initMock();
@@ -53,23 +62,35 @@ public class SqlTest {
@Test
public void testSynchronousSql() {
- SqlResultSet rs = queryMgr.execute("SELECT id, val FROM table WHERE id < {} AND val LIKE {};", 10, "str%");
+ igniteTx.runInTransaction(tx -> {
+ SqlSession sess = queryMgr.session().withTransaction(tx);
- for (SqlRow r : rs) {
- assertTrue(10 > r.longValue("id"));
- assertTrue((r.stringValue("val")).startsWith("str"));
- }
+ SqlResultSet rs = sess.execute("SELECT id, val FROM table WHERE id < {} AND val LIKE {};", 10, "str%");
+
+ for (SqlRow r : rs) {
+ assertTrue(10 > r.longValue("id"));
+ assertTrue((r.stringValue("val")).startsWith("str"));
+ }
+
+ tx.commit();
+ });
+
+ Mockito.verify(tx).commit();
}
@Test
public void testAsyncSql() {
- queryMgr.executeAsync("SELECT id, val FROM table WHERE id == {};", 10)
- .thenCompose(rs -> {
- String str = rs.iterator().next().stringValue("val");
+ igniteTx.beginAsync().thenApply(tx -> queryMgr.session().withTransaction(tx))
+ .thenCompose(sess -> sess.executeAsync("SELECT id, val FROM table WHERE id == {};", 10)
+ .thenCompose(rs -> {
+ String str = rs.iterator().next().stringValue("val");
- return queryMgr.executeAsync("SELECT val FROM table where val LIKE {};", str);
- })
- .join();
+ return sess.executeAsync("SELECT val FROM table where val LIKE {};", str);
+ })
+ .thenApply(ignore -> sess.transaction())
+ ).thenAccept(Transaction::commitAsync);
+
+ Mockito.verify(tx).commitAsync();
}
@Test
@@ -79,16 +100,22 @@ public class SqlTest {
assertTrue(row.stringValue("val").startsWith("str"));
});
- queryMgr.executeReactive("SELECT id, val FROM table WHERE id < {} AND val LIKE {};", 10, "str%")
- .subscribe(subscriber);
+ igniteTx.beginAsync().thenApply(tx -> queryMgr.session().withTransaction(tx))
+ .thenApply(session -> {
+ session.executeReactive("SELECT id, val FROM table WHERE id < {} AND val LIKE {};", 10, "str%")
+ .subscribe(subscriber);
+
+ return session.transaction();
+ })
+ .thenApply(Transaction::commitAsync);
- subscriber.join();
+ Mockito.verify(tx).commitAsync();
}
@Disabled
@Test
public void testMetadata() {
- SqlResultSet rs = queryMgr.execute("SELECT id, val FROM table WHERE id < {} AND val LIKE {}; ", 10, "str%");
+ SqlResultSet rs = queryMgr.session().execute("SELECT id, val FROM table WHERE id < {} AND val LIKE {}; ", 10, "str%");
SqlRow row = rs.iterator().next();
@@ -102,15 +129,22 @@ public class SqlTest {
assertEquals("id", meta.column(0).name());
assertEquals("val", meta.column(1).name());
- assertEquals(ColumnType.INT64, meta.column(0).type());
- assertEquals(ColumnType.string(), meta.column(1).type());
+ assertEquals(ColumnType.INT64, meta.column(0).columnType());
+ assertEquals(ColumnType.string(), meta.column(1).columnType());
assertFalse(meta.column(0).nullable());
assertTrue(meta.column(1).nullable());
}
private void initMock() {
- Mockito.when(queryMgr.execute(Mockito.eq("SELECT id, val FROM table WHERE id < {} AND val LIKE {};"), Mockito.any())).
+ SqlSession session = Mockito.mock(SqlSession.class);
+
+ Mockito.when(queryMgr.session()).thenReturn(session);
+
+ Mockito.when(session.withTransaction(tx)).thenReturn(session);
+ Mockito.when(session.transaction()).thenReturn(tx);
+
+ Mockito.when(session.execute(Mockito.eq("SELECT id, val FROM table WHERE id < {} AND val LIKE {};"), Mockito.any())).
thenAnswer(ans -> Mockito.when(Mockito.mock(SqlResultSet.class).iterator())
.thenReturn(List.of(
new TestRow().set("id", 1L).set("val", "string 1").build(),
@@ -118,7 +152,7 @@ public class SqlTest {
new TestRow().set("id", 5L).set("val", "string 3").build()
).iterator()).getMock());
- Mockito.when(queryMgr.executeAsync(Mockito.eq("SELECT id, val FROM table WHERE id == {};"), Mockito.any()))
+ Mockito.when(session.executeAsync(Mockito.eq("SELECT id, val FROM table WHERE id == {};"), Mockito.any()))
.thenAnswer(ans -> {
Object mock = Mockito.when(Mockito.mock(SqlResultSet.class).iterator())
.thenReturn(List.of(new TestRow().set("id", 1L).set("val", "string 1").build()).iterator())
@@ -127,7 +161,7 @@ public class SqlTest {
return CompletableFuture.completedFuture(mock);
});
- Mockito.when(queryMgr.executeAsync(Mockito.eq("SELECT val FROM table where val LIKE {};"), Mockito.any()))
+ Mockito.when(session.executeAsync(Mockito.eq("SELECT val FROM table where val LIKE {};"), Mockito.any()))
.thenAnswer(ans -> {
Object mock = Mockito.when(Mockito.mock(SqlResultSet.class).iterator())
.thenReturn(List.of(new TestRow().set("id", 10L).set("val", "string 10").build()).iterator())
@@ -136,7 +170,7 @@ public class SqlTest {
return CompletableFuture.completedFuture(mock);
});
- Mockito.when(queryMgr.executeReactive(Mockito.startsWith("SELECT id, val FROM table WHERE id < {} AND val LIKE {};"), Mockito.any()))
+ Mockito.when(session.executeReactive(Mockito.startsWith("SELECT id, val FROM table WHERE id < {} AND val LIKE {};"), Mockito.any()))
.thenAnswer(invocation -> {
ReactiveSqlResultSet mock = Mockito.mock(ReactiveSqlResultSet.class);
@@ -158,12 +192,22 @@ public class SqlTest {
return mock;
});
+
+ Mockito.doAnswer(invocation -> {
+ Consumer<Transaction> argument = invocation.getArgument(0);
+
+ argument.accept(tx);
+
+ return null;
+ }).when(igniteTx).runInTransaction(Mockito.any());
+
+ Mockito.when(igniteTx.beginAsync()).thenReturn(CompletableFuture.completedFuture(tx));
}
/**
* Dummy subsctiber for test purposes.
*/
- static class SqlRowSubscriber extends CompletableFuture implements Flow.Subscriber<SqlRow> {
+ static class SqlRowSubscriber extends CompletableFuture implements Flow.Subscriber<SqlRow> {
private Consumer<SqlRow> rowConsumer;
SqlRowSubscriber(Consumer<SqlRow> rowConsumer) {