You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/08/02 12:06:06 UTC

[ignite-3] 05/05: Add transactions. Fix tests.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-15212
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 8612fe86381e1e506ef4e39680c862f5e8a0036a
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Mon Aug 2 15:05:27 2021 +0300

    Add transactions.
    Fix tests.
---
 .../org/apache/ignite/query/sql/IgniteSql.java     | 67 ++--------------
 .../org/apache/ignite/query/sql/SqlColumnMeta.java |  5 +-
 .../org/apache/ignite/query/sql/SqlResultSet.java  |  5 ++
 .../query/sql/{IgniteSql.java => SqlSession.java}  | 42 ++++-------
 modules/sql/src/test/java/SqlTest.java             | 88 ++++++++++++++++------
 5 files changed, 92 insertions(+), 115 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java b/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
index e5a1d3a..11c5947 100644
--- a/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
+++ b/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
@@ -18,74 +18,18 @@
 package org.apache.ignite.query.sql;
 
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.query.sql.reactive.ReactiveSqlResultSet;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite SQL query facade.
  */
 // TODO: Do we wand a separate IgniteQuery facade for non-sql (index/scan/full-text) queries?
 public interface IgniteSql {
-    ////////////// Query execution.
     /**
-     * Shortcut method for running a query.
+     * Creates SQL session.
      *
-     * @param sql SQL query template.
-     * @param args Arguments for template (optional).
-     * @return SQL query resultset.
-     * @throws SQLException If failed.
+     * @return Session.
      */
-    SqlResultSet execute(@NotNull String sql, Object... args);
-
-    /**
-     * Shortcut method for running a query in asynchronously.
-     *
-     * @param sql SQL query template.
-     * @param args Arguments for template (optional)
-     * @return Query future.
-     * @throws SQLException If failed.
-     */
-    CompletableFuture<SqlResultSet> executeAsync(String sql, Object... args);
-    //TODO: May fut.cancel() cancels a query? If so, describe behavior in Javadoc.
-    //TODO: Cassandra API offers pagination API here, for manual fetching control. Is their AsyncResultSet ever usefull?
-
-    /**
-     * Shortcut method for running a query in asynchronously.
-     *
-     * @param sql SQL query template.
-     * @param args Arguments for template (optional)
-     * @return Reactive result.
-     * @throws SQLException If failed.
-     */
-    ReactiveSqlResultSet executeReactive(String sql, Object... args);
-
-    /**
-     * Shortcut method for running a non-query statement.
-     *
-     * @param sql SQL statement template.
-     * @param args Agruments for template (optional).
-     * @return Number of updated rows.
-     */
-    int executeNonQuery(@NotNull String sql, @Nullable Object... args);
-    //TODO: useful for bulk DML query, when we don't care of results.
-    //TODO: in contrary, execute() method may return inserted rows IDs that looks useful if AutoIncrement ID column is used.
-
-    //TODO: same methods for Statement.
-
-    /**
-     * Sets query session parameter.
-     *
-     * @param name Parameter name.
-     * @param value Parameter value.
-     */
-    void setParameter(String name, Object value);
-    //TODO: User can set e.g. queryTimeout or force join order or whatever.
-    //TODO: This is similar to SQL "SET" operator which is used in JDBC/ODBC clients for session state manipulation.
-
-
-    //TODO: Move all of this to Session. Maybe facade instance could incapsulate a session implicitely?
+    SqlSession session();
 
     /**
      * Kills query by its' id.
@@ -96,7 +40,7 @@ public interface IgniteSql {
 
     /**
      * Returns statistics facade for table statistics management.
-     *
+     * <p>
      * Table statistics are used by SQL engine for SQL queries planning.
      *
      * @return Statistics facade.
@@ -104,9 +48,10 @@ public interface IgniteSql {
     IgniteTableStatistics statistics();
     // TODO: Do we need this here or move to Table facade?
 
-
     void registerUserFunction(Class type, String... methodNames); //TODO: Get function details from method annotations.
+
     void registerUserFunction(Class type);
+
     void unregistedUserFunction(String functionName);
     //TODO: Custom function registration. Do we need a view and unregister functionality?
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/query/sql/SqlColumnMeta.java b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlColumnMeta.java
index c7250a4..2576936 100644
--- a/modules/api/src/main/java/org/apache/ignite/query/sql/SqlColumnMeta.java
+++ b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlColumnMeta.java
@@ -31,16 +31,13 @@ public interface SqlColumnMeta {
      * @return Column name.
      */
     String name();
-    //TODO: do we expect a CanonicalName here?
-    //TODO: do we want a Table name for real column? Is Calcite supports this?
 
     /**
      * Returns column type.
      *
      * @return Column type.
      */
-    ColumnType type();
-    //TODO: do ever we want to expose ColumnType (NativeType) here? Is it useful or drop this?
+    ColumnType columnType();
 
     /**
      * Returns column value type.
diff --git a/modules/api/src/main/java/org/apache/ignite/query/sql/SqlResultSet.java b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlResultSet.java
index 2bbadcf..ea8003d 100644
--- a/modules/api/src/main/java/org/apache/ignite/query/sql/SqlResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlResultSet.java
@@ -33,5 +33,10 @@ public interface SqlResultSet extends Iterable<SqlRow>, AutoCloseable {
      */
     SqlResultSetMeta metadata();
 
+    /**
+     * Returns query unique id.
+     *
+     * @return Query id.
+     */
     UUID queryId();
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlSession.java
similarity index 73%
copy from modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
copy to modules/api/src/main/java/org/apache/ignite/query/sql/SqlSession.java
index e5a1d3a..d9795c7 100644
--- a/modules/api/src/main/java/org/apache/ignite/query/sql/IgniteSql.java
+++ b/modules/api/src/main/java/org/apache/ignite/query/sql/SqlSession.java
@@ -17,18 +17,17 @@
 
 package org.apache.ignite.query.sql;
 
-import java.util.UUID;
+import java.sql.PreparedStatement;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.query.sql.reactive.ReactiveSqlResultSet;
+import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Ignite SQL query facade.
+ * SQL session.
  */
-// TODO: Do we wand a separate IgniteQuery facade for non-sql (index/scan/full-text) queries?
-public interface IgniteSql {
-    ////////////// Query execution.
+public interface SqlSession {
     /**
      * Shortcut method for running a query.
      *
@@ -72,42 +71,29 @@ public interface IgniteSql {
     //TODO: useful for bulk DML query, when we don't care of results.
     //TODO: in contrary, execute() method may return inserted rows IDs that looks useful if AutoIncrement ID column is used.
 
-    //TODO: same methods for Statement.
+    PreparedStatement preparedStatement(@NotNull String sql);
 
     /**
      * Sets query session parameter.
      *
      * @param name Parameter name.
      * @param value Parameter value.
+     * @return {@code this} for chaining.
      */
-    void setParameter(String name, Object value);
+    SqlSession setParameter(@NotNull String name, Object value);
     //TODO: User can set e.g. queryTimeout or force join order or whatever.
     //TODO: This is similar to SQL "SET" operator which is used in JDBC/ODBC clients for session state manipulation.
 
 
-    //TODO: Move all of this to Session. Maybe facade instance could incapsulate a session implicitely?
+    SqlSession withTransaction(Transaction tx);
+    //TODO: What happens with session if TX will commited/rolledback? Tx link can prevent garbage from being collected by GC.
+    //TODO: Can it be shared?
+    //TODO: Move to PreparedStatement/SqlQuery level?
 
     /**
-     * Kills query by its' id.
+     * Returns current transaction.
      *
-     * @param queryID Query id.
+     * @return Current transaction or null if a table is not enlisted in a transaction.
      */
-    void killQuery(UUID queryID);
-
-    /**
-     * Returns statistics facade for table statistics management.
-     *
-     * Table statistics are used by SQL engine for SQL queries planning.
-     *
-     * @return Statistics facade.
-     */
-    IgniteTableStatistics statistics();
-    // TODO: Do we need this here or move to Table facade?
-
-
-    void registerUserFunction(Class type, String... methodNames); //TODO: Get function details from method annotations.
-    void registerUserFunction(Class type);
-    void unregistedUserFunction(String functionName);
-    //TODO: Custom function registration. Do we need a view and unregister functionality?
+    @Nullable Transaction transaction();
 }
-
diff --git a/modules/sql/src/test/java/SqlTest.java b/modules/sql/src/test/java/SqlTest.java
index c49721a..503b1d8 100644
--- a/modules/sql/src/test/java/SqlTest.java
+++ b/modules/sql/src/test/java/SqlTest.java
@@ -23,8 +23,11 @@ import org.apache.ignite.query.sql.IgniteSql;
 import org.apache.ignite.query.sql.SqlResultSet;
 import org.apache.ignite.query.sql.SqlResultSetMeta;
 import org.apache.ignite.query.sql.SqlRow;
+import org.apache.ignite.query.sql.SqlSession;
 import org.apache.ignite.query.sql.reactive.ReactiveSqlResultSet;
 import org.apache.ignite.schema.ColumnType;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -46,6 +49,12 @@ public class SqlTest {
     @Mock
     IgniteSql queryMgr;
 
+    @Mock
+    private IgniteTransactions igniteTx;
+
+    @Mock
+    private Transaction tx;
+
     @BeforeEach
     void setUp() {
         initMock();
@@ -53,23 +62,35 @@ public class SqlTest {
 
     @Test
     public void testSynchronousSql() {
-        SqlResultSet rs = queryMgr.execute("SELECT id, val FROM table WHERE id < {} AND val LIKE {};", 10, "str%");
+        igniteTx.runInTransaction(tx -> {
+            SqlSession sess = queryMgr.session().withTransaction(tx);
 
-        for (SqlRow r : rs) {
-            assertTrue(10 > r.longValue("id"));
-            assertTrue((r.stringValue("val")).startsWith("str"));
-        }
+            SqlResultSet rs = sess.execute("SELECT id, val FROM table WHERE id < {} AND val LIKE {};", 10, "str%");
+
+            for (SqlRow r : rs) {
+                assertTrue(10 > r.longValue("id"));
+                assertTrue((r.stringValue("val")).startsWith("str"));
+            }
+
+            tx.commit();
+        });
+
+        Mockito.verify(tx).commit();
     }
 
     @Test
     public void testAsyncSql() {
-        queryMgr.executeAsync("SELECT id, val FROM table WHERE id == {};", 10)
-            .thenCompose(rs -> {
-                String str = rs.iterator().next().stringValue("val");
+        igniteTx.beginAsync().thenApply(tx -> queryMgr.session().withTransaction(tx))
+            .thenCompose(sess -> sess.executeAsync("SELECT id, val FROM table WHERE id == {};", 10)
+                .thenCompose(rs -> {
+                    String str = rs.iterator().next().stringValue("val");
 
-                return queryMgr.executeAsync("SELECT val FROM table where val LIKE {};", str);
-            })
-            .join();
+                    return sess.executeAsync("SELECT val FROM table where val LIKE {};", str);
+                })
+                .thenApply(ignore -> sess.transaction())
+            ).thenAccept(Transaction::commitAsync);
+
+        Mockito.verify(tx).commitAsync();
     }
 
     @Test
@@ -79,16 +100,22 @@ public class SqlTest {
             assertTrue(row.stringValue("val").startsWith("str"));
         });
 
-        queryMgr.executeReactive("SELECT id, val FROM table WHERE id < {} AND val LIKE {};", 10, "str%")
-            .subscribe(subscriber);
+        igniteTx.beginAsync().thenApply(tx -> queryMgr.session().withTransaction(tx))
+            .thenApply(session -> {
+                session.executeReactive("SELECT id, val FROM table WHERE id < {} AND val LIKE {};", 10, "str%")
+                    .subscribe(subscriber);
+
+                return session.transaction();
+            })
+            .thenApply(Transaction::commitAsync);
 
-        subscriber.join();
+        Mockito.verify(tx).commitAsync();
     }
 
     @Disabled
     @Test
     public void testMetadata() {
-        SqlResultSet rs = queryMgr.execute("SELECT id, val FROM table WHERE id < {} AND val LIKE {}; ", 10, "str%");
+        SqlResultSet rs = queryMgr.session().execute("SELECT id, val FROM table WHERE id < {} AND val LIKE {}; ", 10, "str%");
 
         SqlRow row = rs.iterator().next();
 
@@ -102,15 +129,22 @@ public class SqlTest {
         assertEquals("id", meta.column(0).name());
         assertEquals("val", meta.column(1).name());
 
-        assertEquals(ColumnType.INT64, meta.column(0).type());
-        assertEquals(ColumnType.string(), meta.column(1).type());
+        assertEquals(ColumnType.INT64, meta.column(0).columnType());
+        assertEquals(ColumnType.string(), meta.column(1).columnType());
 
         assertFalse(meta.column(0).nullable());
         assertTrue(meta.column(1).nullable());
     }
 
     private void initMock() {
-        Mockito.when(queryMgr.execute(Mockito.eq("SELECT id, val FROM table WHERE id < {} AND val LIKE {};"), Mockito.any())).
+        SqlSession session = Mockito.mock(SqlSession.class);
+
+        Mockito.when(queryMgr.session()).thenReturn(session);
+
+        Mockito.when(session.withTransaction(tx)).thenReturn(session);
+        Mockito.when(session.transaction()).thenReturn(tx);
+
+        Mockito.when(session.execute(Mockito.eq("SELECT id, val FROM table WHERE id < {} AND val LIKE {};"), Mockito.any())).
             thenAnswer(ans -> Mockito.when(Mockito.mock(SqlResultSet.class).iterator())
                 .thenReturn(List.of(
                     new TestRow().set("id", 1L).set("val", "string 1").build(),
@@ -118,7 +152,7 @@ public class SqlTest {
                     new TestRow().set("id", 5L).set("val", "string 3").build()
                 ).iterator()).getMock());
 
-        Mockito.when(queryMgr.executeAsync(Mockito.eq("SELECT id, val FROM table WHERE id == {};"), Mockito.any()))
+        Mockito.when(session.executeAsync(Mockito.eq("SELECT id, val FROM table WHERE id == {};"), Mockito.any()))
             .thenAnswer(ans -> {
                 Object mock = Mockito.when(Mockito.mock(SqlResultSet.class).iterator())
                     .thenReturn(List.of(new TestRow().set("id", 1L).set("val", "string 1").build()).iterator())
@@ -127,7 +161,7 @@ public class SqlTest {
                 return CompletableFuture.completedFuture(mock);
             });
 
-        Mockito.when(queryMgr.executeAsync(Mockito.eq("SELECT val FROM table where val LIKE {};"), Mockito.any()))
+        Mockito.when(session.executeAsync(Mockito.eq("SELECT val FROM table where val LIKE {};"), Mockito.any()))
             .thenAnswer(ans -> {
                 Object mock = Mockito.when(Mockito.mock(SqlResultSet.class).iterator())
                     .thenReturn(List.of(new TestRow().set("id", 10L).set("val", "string 10").build()).iterator())
@@ -136,7 +170,7 @@ public class SqlTest {
                 return CompletableFuture.completedFuture(mock);
             });
 
-        Mockito.when(queryMgr.executeReactive(Mockito.startsWith("SELECT id, val FROM table WHERE id < {} AND val LIKE {};"), Mockito.any()))
+        Mockito.when(session.executeReactive(Mockito.startsWith("SELECT id, val FROM table WHERE id < {} AND val LIKE {};"), Mockito.any()))
             .thenAnswer(invocation -> {
                 ReactiveSqlResultSet mock = Mockito.mock(ReactiveSqlResultSet.class);
 
@@ -158,12 +192,22 @@ public class SqlTest {
 
                 return mock;
             });
+
+        Mockito.doAnswer(invocation -> {
+            Consumer<Transaction> argument = invocation.getArgument(0);
+
+            argument.accept(tx);
+
+            return null;
+        }).when(igniteTx).runInTransaction(Mockito.any());
+
+        Mockito.when(igniteTx.beginAsync()).thenReturn(CompletableFuture.completedFuture(tx));
     }
 
     /**
      * Dummy subsctiber for test purposes.
      */
-     static class SqlRowSubscriber extends CompletableFuture implements Flow.Subscriber<SqlRow> {
+    static class SqlRowSubscriber extends CompletableFuture implements Flow.Subscriber<SqlRow> {
         private Consumer<SqlRow> rowConsumer;
 
         SqlRowSubscriber(Consumer<SqlRow> rowConsumer) {