You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/26 15:49:41 UTC

[07/14] ignite git commit: Moved platform transactions to Ignite.

Moved platform transactions to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/975f47ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/975f47ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/975f47ea

Branch: refs/heads/ignite-1124
Commit: 975f47eabdaba54ddf0974c2a2b7a2d13c6ea03c
Parents: 6c46e47
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Aug 26 15:07:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Aug 26 15:07:08 2015 +0300

----------------------------------------------------------------------
 .../internal/portable/PortableContext.java      |   1 -
 .../transactions/PlatformTransactions.java      | 255 +++++++++++++++++++
 2 files changed, 255 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/975f47ea/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 83dd01d..2d3cbf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -206,7 +206,6 @@ public class PortableContext implements Externalizable {
 //
 //        registerPredefinedType(InteropClusterNode.class, 67);
 //        registerPredefinedType(InteropClusterMetrics.class, 68);
-//        registerPredefinedType(InteropTransactionMetrics.class, 69);
 //        registerPredefinedType(InteropMetadata.class, 70);
 //
 //        registerPredefinedType(InteropDotNetConfiguration.class, 71);

http://git-wip-us.apache.org/repos/asf/ignite/blob/975f47ea/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
new file mode 100644
index 0000000..fa63840
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.transactions;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Native transaction wrapper implementation.
+ */
+@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
+public class PlatformTransactions extends PlatformAbstractTarget {
+    /** */
+    public static final int OP_CACHE_CONFIG_PARAMETERS = 1;
+
+    /** */
+    public static final int OP_METRICS = 2;
+
+    /** */
+    private final IgniteTransactions txs;
+
+    /** Map with currently active transactions. */
+    private final ConcurrentMap<Long, Transaction> txMap = GridConcurrentFactory.newMap();
+
+    /** Transaction ID sequence. Must be static to ensure uniqueness across different caches. */
+    private static final AtomicLong TX_ID_GEN = new AtomicLong();
+
+    /**
+     * Constructor.
+     *
+     * @param interopCtx Interop context.
+     */
+    public PlatformTransactions(PlatformContext interopCtx) {
+        super(interopCtx);
+
+        txs = interopCtx.kernalContext().grid().transactions();
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param isolation Isolation.
+     * @param timeout Timeout
+     * @param txSize Number of entries participating in transaction.
+     * @return Transaction thread ID.
+     */
+    public long txStart(int concurrency, int isolation, long timeout, int txSize) {
+        TransactionConcurrency txConcurrency = TransactionConcurrency.fromOrdinal(concurrency);
+
+        assert txConcurrency != null;
+
+        TransactionIsolation txIsolation = TransactionIsolation.fromOrdinal(isolation);
+
+        assert txIsolation != null;
+
+        Transaction tx = txs.txStart(txConcurrency, txIsolation);
+
+        return registerTx(tx);
+    }
+
+    /**
+     * @param id Transaction ID.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    public int txCommit(long id) throws IgniteCheckedException {
+        tx(id).commit();
+
+        return txClose(id);
+    }
+
+    /**
+     * @param id Transaction ID.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    public int txRollback(long id) throws IgniteCheckedException {
+        tx(id).rollback();
+
+        return txClose(id);
+    }
+
+    /**
+     * @param id Transaction ID.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     * @return Transaction state.
+     */
+    public int txClose(long id) throws IgniteCheckedException {
+        Transaction tx = tx(id);
+
+        try {
+            tx.close();
+
+            return tx.state().ordinal();
+        }
+        finally {
+            unregisterTx(id);
+        }
+    }
+
+    /**
+     * @param id Transaction ID.
+     * @return Transaction state.
+     */
+    public int txState(long id) {
+        Transaction tx = tx(id);
+
+        return tx.state().ordinal();
+    }
+
+    /**
+     * @param id Transaction ID.
+     * @return {@code True} if rollback only flag was set.
+     */
+    public boolean txSetRollbackOnly(long id) {
+        Transaction tx = tx(id);
+
+        return tx.setRollbackOnly();
+    }
+
+    /**
+     * Commits tx in async mode.
+     */
+    public void txCommitAsync(final long txId, final long futId) {
+        final Transaction asyncTx = (Transaction)tx(txId).withAsync();
+
+        asyncTx.commit();
+
+        listenAndNotifyIntFuture(futId, asyncTx);
+    }
+
+    /**
+     * Rolls back tx in async mode.
+     */
+    public void txRollbackAsync(final long txId, final long futId) {
+        final Transaction asyncTx = (Transaction)tx(txId).withAsync();
+
+        asyncTx.rollback();
+
+        listenAndNotifyIntFuture(futId, asyncTx);
+    }
+
+    /**
+     * Listens to the transaction future and notifies .NET int future.
+     */
+    private void listenAndNotifyIntFuture(final long futId, final Transaction asyncTx) {
+        IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() {
+            private static final long serialVersionUID = 0L;
+
+            @Override public Object apply(IgniteFuture fut) {
+                return null;
+            }
+        });
+
+        PlatformFutureUtils.listen(interopCtx, fut, futId, PlatformFutureUtils.TYP_OBJ);
+    }
+
+    /**
+     * Resets transaction metrics.
+     */
+    public void resetMetrics() {
+       txs.resetMetrics();
+    }
+
+    /**
+     * Register transaction.
+     *
+     * @param tx Transaction.
+     * @return Transaction ID.
+     */
+    private long registerTx(Transaction tx) {
+        long id = TX_ID_GEN.incrementAndGet();
+
+        Transaction old = txMap.put(id, tx);
+
+        assert old == null : "Duplicate TX ids: " + old;
+
+        return id;
+    }
+
+    /**
+     * Unregister transaction.
+     *
+     * @param id Transaction ID.
+     */
+    private void unregisterTx(long id) {
+        Transaction tx = txMap.remove(id);
+
+        assert tx != null : "Failed to unregister transaction: " + id;
+    }
+
+    /**
+     * Get transaction by ID.
+     *
+     * @param id ID.
+     * @return Transaction.
+     */
+    private Transaction tx(long id) {
+        Transaction tx = txMap.get(id);
+
+        assert tx != null : "Transaction not found for ID: " + id;
+
+        return tx;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_CACHE_CONFIG_PARAMETERS:
+                TransactionConfiguration txCfg = interopCtx.kernalContext().config().getTransactionConfiguration();
+
+                writer.writeEnum(txCfg.getDefaultTxConcurrency());
+                writer.writeEnum(txCfg.getDefaultTxIsolation());
+                writer.writeLong(txCfg.getDefaultTxTimeout());
+
+                break;
+
+            case OP_METRICS:
+                TransactionMetrics metrics = txs.metrics();
+
+                writer.writeDate(new Date(metrics.commitTime()));
+                writer.writeDate(new Date(metrics.rollbackTime()));
+                writer.writeInt(metrics.txCommits());
+                writer.writeInt(metrics.txRollbacks());
+
+                break;
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+}
\ No newline at end of file