You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/26 15:49:41 UTC
[07/14] ignite git commit: Moved platform transactions to Ignite.
Moved platform transactions to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/975f47ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/975f47ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/975f47ea
Branch: refs/heads/ignite-1124
Commit: 975f47eabdaba54ddf0974c2a2b7a2d13c6ea03c
Parents: 6c46e47
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Aug 26 15:07:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Aug 26 15:07:08 2015 +0300
----------------------------------------------------------------------
.../internal/portable/PortableContext.java | 1 -
.../transactions/PlatformTransactions.java | 255 +++++++++++++++++++
2 files changed, 255 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/975f47ea/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 83dd01d..2d3cbf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -206,7 +206,6 @@ public class PortableContext implements Externalizable {
//
// registerPredefinedType(InteropClusterNode.class, 67);
// registerPredefinedType(InteropClusterMetrics.class, 68);
-// registerPredefinedType(InteropTransactionMetrics.class, 69);
// registerPredefinedType(InteropMetadata.class, 70);
//
// registerPredefinedType(InteropDotNetConfiguration.class, 71);
http://git-wip-us.apache.org/repos/asf/ignite/blob/975f47ea/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
new file mode 100644
index 0000000..fa63840
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.transactions;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Native transaction wrapper implementation.
+ */
+@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
+public class PlatformTransactions extends PlatformAbstractTarget {
+ /** */
+ public static final int OP_CACHE_CONFIG_PARAMETERS = 1;
+
+ /** */
+ public static final int OP_METRICS = 2;
+
+ /** */
+ private final IgniteTransactions txs;
+
+ /** Map with currently active transactions. */
+ private final ConcurrentMap<Long, Transaction> txMap = GridConcurrentFactory.newMap();
+
+ /** Transaction ID sequence. Must be static to ensure uniqueness across different caches. */
+ private static final AtomicLong TX_ID_GEN = new AtomicLong();
+
+ /**
+ * Constructor.
+ *
+ * @param interopCtx Interop context.
+ */
+ public PlatformTransactions(PlatformContext interopCtx) {
+ super(interopCtx);
+
+ txs = interopCtx.kernalContext().grid().transactions();
+ }
+
+ /**
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @param timeout Timeout
+ * @param txSize Number of entries participating in transaction.
+ * @return Transaction thread ID.
+ */
+ public long txStart(int concurrency, int isolation, long timeout, int txSize) {
+ TransactionConcurrency txConcurrency = TransactionConcurrency.fromOrdinal(concurrency);
+
+ assert txConcurrency != null;
+
+ TransactionIsolation txIsolation = TransactionIsolation.fromOrdinal(isolation);
+
+ assert txIsolation != null;
+
+ Transaction tx = txs.txStart(txConcurrency, txIsolation);
+
+ return registerTx(tx);
+ }
+
+ /**
+ * @param id Transaction ID.
+ * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ */
+ public int txCommit(long id) throws IgniteCheckedException {
+ tx(id).commit();
+
+ return txClose(id);
+ }
+
+ /**
+ * @param id Transaction ID.
+ * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ */
+ public int txRollback(long id) throws IgniteCheckedException {
+ tx(id).rollback();
+
+ return txClose(id);
+ }
+
+ /**
+ * @param id Transaction ID.
+ * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ * @return Transaction state.
+ */
+ public int txClose(long id) throws IgniteCheckedException {
+ Transaction tx = tx(id);
+
+ try {
+ tx.close();
+
+ return tx.state().ordinal();
+ }
+ finally {
+ unregisterTx(id);
+ }
+ }
+
+ /**
+ * @param id Transaction ID.
+ * @return Transaction state.
+ */
+ public int txState(long id) {
+ Transaction tx = tx(id);
+
+ return tx.state().ordinal();
+ }
+
+ /**
+ * @param id Transaction ID.
+ * @return {@code True} if rollback only flag was set.
+ */
+ public boolean txSetRollbackOnly(long id) {
+ Transaction tx = tx(id);
+
+ return tx.setRollbackOnly();
+ }
+
+ /**
+ * Commits tx in async mode.
+ */
+ public void txCommitAsync(final long txId, final long futId) {
+ final Transaction asyncTx = (Transaction)tx(txId).withAsync();
+
+ asyncTx.commit();
+
+ listenAndNotifyIntFuture(futId, asyncTx);
+ }
+
+ /**
+ * Rolls back tx in async mode.
+ */
+ public void txRollbackAsync(final long txId, final long futId) {
+ final Transaction asyncTx = (Transaction)tx(txId).withAsync();
+
+ asyncTx.rollback();
+
+ listenAndNotifyIntFuture(futId, asyncTx);
+ }
+
+ /**
+ * Listens to the transaction future and notifies .NET int future.
+ */
+ private void listenAndNotifyIntFuture(final long futId, final Transaction asyncTx) {
+ IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public Object apply(IgniteFuture fut) {
+ return null;
+ }
+ });
+
+ PlatformFutureUtils.listen(interopCtx, fut, futId, PlatformFutureUtils.TYP_OBJ);
+ }
+
+ /**
+ * Resets transaction metrics.
+ */
+ public void resetMetrics() {
+ txs.resetMetrics();
+ }
+
+ /**
+ * Register transaction.
+ *
+ * @param tx Transaction.
+ * @return Transaction ID.
+ */
+ private long registerTx(Transaction tx) {
+ long id = TX_ID_GEN.incrementAndGet();
+
+ Transaction old = txMap.put(id, tx);
+
+ assert old == null : "Duplicate TX ids: " + old;
+
+ return id;
+ }
+
+ /**
+ * Unregister transaction.
+ *
+ * @param id Transaction ID.
+ */
+ private void unregisterTx(long id) {
+ Transaction tx = txMap.remove(id);
+
+ assert tx != null : "Failed to unregister transaction: " + id;
+ }
+
+ /**
+ * Get transaction by ID.
+ *
+ * @param id ID.
+ * @return Transaction.
+ */
+ private Transaction tx(long id) {
+ Transaction tx = txMap.get(id);
+
+ assert tx != null : "Transaction not found for ID: " + id;
+
+ return tx;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+ switch (type) {
+ case OP_CACHE_CONFIG_PARAMETERS:
+ TransactionConfiguration txCfg = interopCtx.kernalContext().config().getTransactionConfiguration();
+
+ writer.writeEnum(txCfg.getDefaultTxConcurrency());
+ writer.writeEnum(txCfg.getDefaultTxIsolation());
+ writer.writeLong(txCfg.getDefaultTxTimeout());
+
+ break;
+
+ case OP_METRICS:
+ TransactionMetrics metrics = txs.metrics();
+
+ writer.writeDate(new Date(metrics.commitTime()));
+ writer.writeDate(new Date(metrics.rollbackTime()));
+ writer.writeInt(metrics.txCommits());
+ writer.writeInt(metrics.txRollbacks());
+
+ break;
+
+ default:
+ throwUnsupported(type);
+ }
+ }
+}
\ No newline at end of file