You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/10/27 23:08:51 UTC

[GitHub] [ignite-3] vldpyatkov commented on a change in pull request #400: IGNITE-15085

vldpyatkov commented on a change in pull request #400:
URL: https://github.com/apache/ignite-3/pull/400#discussion_r736581926



##########
File path: modules/api/src/main/java/org/apache/ignite/tx/TransactionException.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx;
+
+import org.apache.ignite.lang.IgniteException;
+
+/** */
+public class TransactionException extends IgniteException {

Review comment:
       Description required here.

##########
File path: modules/api/src/main/java/org/apache/ignite/lang/IgniteCheckedException.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * General Ignite exception which requires handling. This exception is used to indicate any error condition within the node.
+ */
+public class IgniteCheckedException extends Exception {

Review comment:
       Why this exception is better than IgniteInternalCheckedException?

##########
File path: modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishRequest.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Submit an action to a replication group.
+ */
+@Transferable(value = TxMessageGroup.TX_FINISH_REQUEST, autoSerializable = false)
+public interface TxFinishRequest extends NetworkMessage, Serializable {
+    /**
+     * @return The timestamp.
+     */
+    Timestamp timestamp();
+
+    /**
+     * @return {@code True} to commit.
+     */
+    boolean commit();
+
+    /**
+     * @return Enlisted partition groups.
+     */
+    Set<String> partitions();

Review comment:
       Think about different name, because partitions, it is a number which mean portions count.

##########
File path: modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
##########
@@ -68,7 +68,7 @@
     /**
      * Maximum number of tasks that can be applied in a batch
      */
-    private int applyBatch = 32;
+    private int applyBatch = 1;

Review comment:
       Return it to default (the WA was already applied in IGNITE-15696)

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -141,151 +151,183 @@ public InternalTableImpl(
         this.schemaMode = schemaMode;
     }
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<BinaryRow> get(BinaryRow keyRow, Transaction tx) {
-        return partitionMap.get(partId(keyRow)).<SingleRowResponse>run(new GetCommand(keyRow))
-            .thenApply(SingleRowResponse::getValue);
-    }
+    /**
+     * @param keyRows Rows.
+     * @param tx The transaction.
+     * @param op Command factory.
+     * @param reducer The reducer.
+     * @param <R> Reducer's input.
+     * @param <T> Reducer's output.
+     * @return
+     */
+    private <R, T> CompletableFuture<T> wrapInTx(
+        Collection<BinaryRow> keyRows,
+        InternalTransaction tx,
+        BiFunction<Collection<BinaryRow>, InternalTransaction, Command> op,
+        Function<CompletableFuture<R>[], CompletableFuture<T>> reducer
+    ) {
+        if (tx == null) {
+            try {
+                tx = txManager.tx();
+            }
+            catch (TransactionException e) {
+                return failedFuture(e);
+            }
+        }
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows, Transaction tx) {
-        Map<Integer, Set<BinaryRow>> keyRowsByPartition = mapRowsToPartitions(keyRows);
+        final boolean implicit = tx == null;
+
+        final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
 
-        CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+        Map<Integer, List<BinaryRow>> keyRowsByPartition = mapRowsToPartitions(keyRows);
+
+        CompletableFuture<R>[] futures = new CompletableFuture[keyRowsByPartition.size()];
 
         int batchNum = 0;
 
-        for (Map.Entry<Integer, Set<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
-            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new GetAllCommand(partToRows.getValue()));
+        for (Map.Entry<Integer, List<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+            CompletableFuture<RaftGroupService> fut = enlist(partToRows.getKey(), tx0);
 
-            batchNum++;
+            futures[batchNum++] = fut.thenCompose(svc -> svc.run(op.apply(partToRows.getValue(), tx0)));
         }
 
-        return collectMultiRowsResponses(futures);
-    }
+        CompletableFuture<T> fut = reducer.apply(futures);
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<Void> upsert(BinaryRow row, Transaction tx) {
-        return partitionMap.get(partId(row)).run(new UpsertCommand(row));
+        return fut.handle(new BiFunction<T, Throwable, CompletableFuture<T>>() {
+            @Override public CompletableFuture<T> apply(T r, Throwable e) {
+                if (e != null)
+                    return tx0.rollbackAsync().thenCompose(ignored -> failedFuture(e)); // Preserve failed state.
+                else
+                    return implicit ? tx0.commitAsync().thenApply(ignored -> r) : completedFuture(r);
+            }
+        }).thenCompose(x -> x);
     }
 
-    /** {@inheritDoc} */
-    @Override public CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows, Transaction tx) {
-        Map<Integer, Set<BinaryRow>> keyRowsByPartition = mapRowsToPartitions(rows);
+    /**
+     * @param row The row.
+     * @param tx The transaction.
+     * @param op Command factory.
+     * @param trans Transform closure.
+     * @param <R> Transform input.
+     * @param <T> Transform output.
+     * @return
+     */
+    private <R, T> CompletableFuture<T> wrapInTx(
+        BinaryRow row,
+        InternalTransaction tx,
+        Function<InternalTransaction, Command> op,
+        Function<R, T> trans
+    ) {
+        if (tx == null) {
+            try {
+                tx = txManager.tx();
+            }
+            catch (TransactionException e) {
+                return failedFuture(e);
+            }
+        }
 
-        CompletableFuture<Void>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+        final boolean implicit = tx == null;
 
-        int batchNum = 0;
+        final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
 
-        for (Map.Entry<Integer, Set<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
-            futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new UpsertAllCommand(partToRows.getValue()));
+        int partId = partId(row);
 
-            batchNum++;
-        }
+        CompletableFuture<RaftGroupService> enlistFut = enlist(partId, tx0);
 
-        return CompletableFuture.allOf(futures);
+        CompletableFuture<T> fut = enlistFut.thenCompose(svc -> svc.<R>run(op.apply(tx0)).thenApply(trans::apply));
+
+        // TODO asch remove futures creation
+        return fut.handle(new BiFunction<T, Throwable, CompletableFuture<T>>() {
+            @Override public CompletableFuture<T> apply(T r, Throwable e) {
+                if (e != null)
+                    return tx0.rollbackAsync().thenCompose(ignored -> failedFuture(e)); // Preserve failed state.
+                else
+                    return implicit ? tx0.commitAsync().thenApply(ignored -> r) : completedFuture(r);
+            }
+        }).thenCompose(x -> x);

Review comment:
       It looks suspective (thenCompose(x -> x)) .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org