You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/24 09:19:40 UTC
[05/50] [abbrv] ignite git commit: 'Single' operations optimizations
for tx cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index db4a4b8..434b6c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -68,9 +68,7 @@ import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
@@ -87,7 +85,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
private static final long serialVersionUID = 0L;
/** DHT mappings. */
- private Map<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
+ private IgniteTxMappings mappings;
/** Future. */
@GridToStringExclude
@@ -172,6 +170,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
subjId,
taskNameHash);
+ mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl();
+
initResult();
}
@@ -208,13 +208,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override protected IgniteUuid nearMiniId() {
- assert false : "nearMiniId should not be called for colocated transactions.";
-
- return null;
- }
-
- /** {@inheritDoc} */
@Override protected IgniteInternalFuture<Boolean> addReader(
long msgId,
GridDhtCacheEntry cached,
@@ -280,15 +273,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @return {@code True} if transaction is fully synchronous.
*/
private boolean sync() {
- if (super.syncCommit())
- return true;
-
- for (int cacheId : activeCacheIds()) {
- if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC)
- return true;
- }
-
- return false;
+ return super.syncCommit() || txState().sync(cctx);
}
/**
@@ -471,7 +456,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/**
* @return DHT map.
*/
- Map<UUID, GridDistributedTxMapping> mappings() {
+ IgniteTxMappings mappings() {
return mappings;
}
@@ -518,9 +503,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
GridDistributedTxMapping m = mappings.get(node.id());
if (m == null)
- mappings.put(node.id(), m = new GridDistributedTxMapping(node));
+ mappings.put(m = new GridDistributedTxMapping(node));
- IgniteTxEntry txEntry = txMap.get(key);
+ IgniteTxEntry txEntry = entry(key);
assert txEntry != null;
@@ -534,26 +519,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/**
- * Adds keys mapping.
- *
- * @param n Mapped node.
- * @param mappedKeys Mapped keys.
+ * @return Non-null entry if tx has only one write entry.
*/
- private void addKeyMapping(ClusterNode n, Iterable<IgniteTxKey> mappedKeys) {
- GridDistributedTxMapping m = mappings.get(n.id());
-
- if (m == null)
- mappings.put(n.id(), m = new GridDistributedTxMapping(n));
-
- for (IgniteTxKey key : mappedKeys) {
- IgniteTxEntry txEntry = txMap.get(key);
-
- assert txEntry != null;
-
- txEntry.nodeId(n.id());
-
- m.add(txEntry);
- }
+ @Nullable IgniteTxEntry singleWrite() {
+ return txState.singleWrite();
}
/**
@@ -567,7 +536,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
GridDistributedTxMapping m = mappings.get(n.id());
if (m == null) {
- m = F.addIfAbsent(mappings, n.id(), new GridDistributedTxMapping(n));
+ mappings.put(m = new GridDistributedTxMapping(n));
m.near(map.near());
@@ -575,8 +544,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
m.markExplicitLock();
}
- assert m != null;
-
for (IgniteTxEntry entry : map.entries())
m.add(entry);
}
@@ -588,6 +555,25 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/**
+ * @param map Mapping.
+ * @param entry Entry.
+ */
+ void addSingleEntryMapping(GridDistributedTxMapping map, IgniteTxEntry entry) {
+ ClusterNode n = map.node();
+
+ GridDistributedTxMapping m = new GridDistributedTxMapping(n);
+
+ mappings.put(m);
+
+ m.near(map.near());
+
+ if (map.explicitLock())
+ m.markExplicitLock();
+
+ m.add(entry);
+ }
+
+ /**
* @param nodeId Node ID to mark with explicit lock.
* @return {@code True} if mapping was found.
*/
@@ -623,8 +609,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers)
{
- Collection<IgniteTxEntry> entries = F.concat(false, mapping.writes(), mapping.reads());
+ readyNearLocks(mapping.writes(), mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers);
+ readyNearLocks(mapping.reads(), mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers);
+ }
+ /**
+ * @param entries Entries.
+ * @param dhtVer DHT version.
+ * @param pendingVers Pending versions.
+ * @param committedVers Committed versions.
+ * @param rolledbackVers Rolled back versions.
+ */
+ void readyNearLocks(Collection<IgniteTxEntry> entries,
+ GridCacheVersion dhtVer,
+ Collection<GridCacheVersion> pendingVers,
+ Collection<GridCacheVersion> committedVers,
+ Collection<GridCacheVersion> rolledbackVers)
+ {
for (IgniteTxEntry txEntry : entries) {
while (true) {
GridCacheContext cacheCtx = txEntry.cached().context();
@@ -637,8 +638,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
// Handle explicit locks.
GridCacheVersion explicit = txEntry.explicitVersion();
- if (explicit == null)
- entry.readyNearLock(xidVer, mapping.dhtVersion(), committedVers, rolledbackVers, pendingVers);
+ if (explicit == null) {
+ entry.readyNearLock(xidVer,
+ dhtVer,
+ committedVers,
+ rolledbackVers,
+ pendingVers);
+ }
break;
}
@@ -871,7 +877,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
* @param writes Write entries.
* @param txNodes Transaction nodes mapping.
* @param last {@code True} if this is last prepare request.
- * @param lastBackups IDs of backup nodes receiving last prepare request.
* @return Future that will be completed when locks are acquired.
*/
@SuppressWarnings("TypeMayBeWeakened")
@@ -879,8 +884,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
@Nullable Collection<IgniteTxEntry> reads,
@Nullable Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes,
- boolean last,
- Collection<UUID> lastBackups
+ boolean last
) {
if (state() != PREPARING) {
if (timedOut())
@@ -901,8 +905,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
IgniteUuid.randomUuid(),
Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
last,
- needReturnValue() && implicit(),
- lastBackups);
+ needReturnValue() && implicit());
try {
// At this point all the entries passed in must be enlisted in transaction because this is an
@@ -1274,6 +1277,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString());
+ return S.toString(GridNearTxLocal.class, this, "mappings", mappings, "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 456d726..798635a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -99,7 +99,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param near {@code True} if mapping is for near caches.
* @param txNodes Transaction nodes mapping.
* @param last {@code True} if this last prepare request for node.
- * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare.
* @param onePhaseCommit One phase commit flag.
* @param retVal Return value flag.
* @param implicitSingle Implicit single flag.
@@ -118,7 +117,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
boolean near,
Map<UUID, Collection<UUID>> txNodes,
boolean last,
- Collection<UUID> lastBackups,
boolean onePhaseCommit,
boolean retVal,
boolean implicitSingle,
@@ -137,7 +135,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
this.topVer = topVer;
this.near = near;
this.last = last;
- this.lastBackups = lastBackups;
this.retVal = retVal;
this.implicitSingle = implicitSingle;
this.explicitLock = explicitLock;
@@ -153,12 +150,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
return firstClientReq;
}
- /**
- * @return IDs of backup nodes receiving last prepare request during this prepare.
- */
- public Collection<UUID> lastBackups() {
- return lastBackups;
- }
/**
* @return {@code True} if this last prepare request for node.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 87c68b2..d078df4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -34,10 +34,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteStateImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -74,7 +76,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
* @param ldr Class loader.
* @param nodeId Node ID.
* @param nearNodeId Near node ID.
- * @param rmtThreadId Remote thread ID.
* @param xidVer XID version.
* @param commitVer Commit version.
* @param sys System flag.
@@ -92,7 +93,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
ClassLoader ldr,
UUID nodeId,
UUID nearNodeId,
- long rmtThreadId,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
boolean sys,
@@ -109,8 +109,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
super(
ctx,
nodeId,
- rmtThreadId,
- xidVer,
+ xidVer,
commitVer,
sys,
plc,
@@ -127,10 +126,10 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
this.nearNodeId = nearNodeId;
- readMap = Collections.emptyMap();
+ int writeSize = writeEntries != null ? Math.max(txSize, writeEntries.size()) : txSize;
- writeMap = new LinkedHashMap<>(
- writeEntries != null ? Math.max(txSize, writeEntries.size()) : txSize, 1.0f);
+ txState = new IgniteTxRemoteStateImpl(Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(),
+ U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(writeSize));
if (writeEntries != null) {
for (IgniteTxEntry entry : writeEntries) {
@@ -147,7 +146,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
* @param nodeId Node ID.
* @param nearNodeId Near node ID.
* @param nearXidVer Near transaction ID.
- * @param rmtThreadId Remote thread ID.
* @param xidVer XID version.
* @param commitVer Commit version.
* @param sys System flag.
@@ -163,7 +161,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
UUID nodeId,
UUID nearNodeId,
GridCacheVersion nearXidVer,
- long rmtThreadId,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
boolean sys,
@@ -179,8 +176,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
super(
ctx,
nodeId,
- rmtThreadId,
- xidVer,
+ xidVer,
commitVer,
sys,
plc,
@@ -198,8 +194,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
this.nearXidVer = nearXidVer;
this.nearNodeId = nearNodeId;
- readMap = new LinkedHashMap<>(1, 1.0f);
- writeMap = new LinkedHashMap<>(txSize, 1.0f);
+ txState = new IgniteTxRemoteStateImpl(U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(1),
+ U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(txSize));
}
/** {@inheritDoc} */
@@ -322,7 +318,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
// Initialize cache entry.
entry.cached(cached);
- writeMap.put(entry.txKey(), entry);
+ txState.addWriteEntry(entry.txKey(), entry);
addExplicit(entry);
@@ -391,7 +387,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
drVer,
skipStore);
- writeMap.put(key, txEntry);
+ txState.addWriteEntry(key, txEntry);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappings.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappings.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappings.java
new file mode 100644
index 0000000..0465510
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappings.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface IgniteTxMappings {
+ /**
+ * Clears this mappings.
+ */
+ public void clear();
+
+ /**
+ * @return {@code True} if there are no mappings.
+ */
+ public boolean empty();
+
+ /**
+ * @param nodeId Node ID.
+ * @return Node mapping.
+ */
+ @Nullable public GridDistributedTxMapping get(UUID nodeId);
+
+ /**
+ * @param mapping Mapping.
+ */
+ public void put(GridDistributedTxMapping mapping);
+
+ /**
+ * @param nodeId Node ID.
+ * @return Removed mapping.
+ */
+ @Nullable public GridDistributedTxMapping remove(UUID nodeId);
+
+ /**
+ * @return Mapping for local node.
+ */
+ @Nullable public GridDistributedTxMapping localMapping();
+
+ /**
+ * @return Non null instance if this mappings contain only one mapping.
+ */
+ @Nullable public GridDistributedTxMapping singleMapping();
+
+ /**
+ * @return All mappings.
+ */
+ public Collection<GridDistributedTxMapping> mappings();
+
+ /**
+ * @return {@code True} if this is single mapping.
+ */
+ public boolean single();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
new file mode 100644
index 0000000..7dec7af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ *
+ */
+public class IgniteTxMappingsImpl implements IgniteTxMappings {
+ /** */
+ private final Map<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ mappings.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return mappings.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDistributedTxMapping get(UUID nodeId) {
+ return mappings.get(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(GridDistributedTxMapping mapping) {
+ mappings.put(mapping.node().id(), mapping);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDistributedTxMapping remove(UUID nodeId) {
+ return mappings.remove(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridDistributedTxMapping localMapping() {
+ for (GridDistributedTxMapping m : mappings.values()) {
+ if (m.node().isLocal())
+ return m;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean single() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridDistributedTxMapping singleMapping() {
+ assert mappings.size() == 1 : mappings;
+
+ return F.firstValue(mappings);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridDistributedTxMapping> mappings() {
+ return mappings.values();
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(IgniteTxMappingsImpl.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
new file mode 100644
index 0000000..fc15592
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class IgniteTxMappingsSingleImpl implements IgniteTxMappings {
+ /** */
+ private volatile GridDistributedTxMapping mapping;
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ mapping = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return mapping == null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDistributedTxMapping get(UUID nodeId) {
+ GridDistributedTxMapping mapping0 = mapping;
+
+ return (mapping0 != null && mapping0.node().id().equals(nodeId)) ? mapping0 : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(GridDistributedTxMapping mapping) {
+ assert this.mapping == null;
+
+ this.mapping = mapping;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDistributedTxMapping remove(UUID nodeId) {
+ GridDistributedTxMapping mapping0 = mapping;
+
+ if (mapping0 != null && mapping0.node().id().equals(nodeId)) {
+ this.mapping = null;
+
+ return mapping0;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridDistributedTxMapping localMapping() {
+ GridDistributedTxMapping mapping0 = mapping;
+
+ if (mapping0 != null && mapping0.node().isLocal())
+ return mapping0;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean single() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridDistributedTxMapping singleMapping() {
+ return mapping;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridDistributedTxMapping> mappings() {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(IgniteTxMappingsSingleImpl.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 94af6bb..f5f99f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -275,9 +275,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
public boolean implicitSingle();
/**
- * @return Collection of cache IDs involved in this transaction.
+ * @return Transaction state.
*/
- public Collection<Integer> activeCacheIds();
+ public IgniteTxState txState();
/**
* @return {@code true} or {@code false} if the deployment is enabled or disabled for all active caches involved
@@ -669,14 +669,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
public boolean serializable();
/**
- * Checks whether given key has been removed within transaction.
- *
- * @param key Key to check.
- * @return {@code True} if key has been removed.
- */
- public boolean removed(IgniteTxKey key);
-
- /**
* Gets allowed remaining time for this transaction.
*
* @return Remaining time.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb2ca2c..6a0f8ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -50,7 +49,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -120,10 +118,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
@GridToStringInclude
protected boolean implicit;
- /** Implicit with one key flag. */
- @GridToStringInclude
- protected boolean implicitSingle;
-
/** Local flag. */
@GridToStringInclude
protected boolean loc;
@@ -262,7 +256,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
* @param cctx Cache registry.
* @param xidVer Transaction ID.
* @param implicit Implicit flag.
- * @param implicitSingle Implicit with one key flag.
* @param loc Local flag.
* @param sys System transaction flag.
* @param plc IO policy.
@@ -275,7 +268,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
GridCacheSharedContext<?, ?> cctx,
GridCacheVersion xidVer,
boolean implicit,
- boolean implicitSingle,
boolean loc,
boolean sys,
byte plc,
@@ -295,7 +287,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
this.cctx = cctx;
this.xidVer = xidVer;
this.implicit = implicit;
- this.implicitSingle = implicitSingle;
this.loc = loc;
this.sys = sys;
this.plc = plc;
@@ -362,7 +353,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
this.taskNameHash = taskNameHash;
implicit = false;
- implicitSingle = false;
loc = false;
if (log == null)
@@ -421,45 +411,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public boolean storeUsed() {
- if (!storeEnabled())
- return false;
-
- Collection<Integer> cacheIds = activeCacheIds();
-
- if (!cacheIds.isEmpty()) {
- for (int cacheId : cacheIds) {
- CacheStoreManager store = cctx.cacheContext(cacheId).store();
-
- if (store.configured())
- return true;
- }
- }
-
- return false;
- }
-
- /**
- * Store manager for current transaction.
- *
- * @return Store manager.
- */
- protected Collection<CacheStoreManager> stores() {
- Collection<Integer> cacheIds = activeCacheIds();
-
- if (!cacheIds.isEmpty()) {
- Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size());
-
- for (int cacheId : cacheIds) {
- CacheStoreManager store = cctx.cacheContext(cacheId).store();
-
- if (store.configured())
- stores.add(store);
- }
-
- return stores;
- }
-
- return null;
+ return storeEnabled() && txState().storeUsed(cctx);
}
/**
@@ -645,7 +597,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public boolean implicitSingle() {
- return implicitSingle;
+ return txState().implicitSingle();
}
/** {@inheritDoc} */
@@ -1758,11 +1710,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public Collection<Integer> activeCacheIds() {
- throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
- }
-
- /** {@inheritDoc} */
@Override public boolean activeCachesDeploymentEnabled() {
return false;
}
@@ -1877,6 +1824,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
+ @Override public IgniteTxState txState() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<UUID> masterNodeIds() {
return null;
}
@@ -2150,11 +2102,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Override public boolean removed(IgniteTxKey key) {
- return false;
- }
-
- /** {@inheritDoc} */
@Override public long remainingTime() throws IgniteTxTimeoutCheckedException {
return 0;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 570aa48..0e5657b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -216,8 +216,7 @@ public class IgniteTxHandler {
req.reads(),
req.writes(),
req.transactionNodes(),
- req.last(),
- req.lastBackups());
+ req.last());
if (locTx.isRollbackOnly())
locTx.rollbackAsync();
@@ -398,7 +397,6 @@ public class IgniteTxHandler {
if (req.onePhaseCommit()) {
assert req.last();
- assert F.isEmpty(req.lastBackups()) || req.lastBackups().size() <= 1;
tx.onePhaseCommit(true);
}
@@ -413,8 +411,7 @@ public class IgniteTxHandler {
req.messageId(),
req.miniId(),
req.transactionNodes(),
- req.last(),
- req.lastBackups());
+ req.last());
if (tx.isRollbackOnly()) {
try {
@@ -1091,12 +1088,13 @@ public class IgniteTxHandler {
GridDhtTxRemote tx = ctx.tm().tx(req.version());
if (tx == null) {
+ boolean single = req.last() && req.writes().size() == 1;
+
tx = new GridDhtTxRemote(
ctx,
req.nearNodeId(),
req.futureId(),
nodeId,
- req.threadId(),
req.topologyVersion(),
req.version(),
null,
@@ -1110,7 +1108,8 @@ public class IgniteTxHandler {
req.nearXidVersion(),
req.transactionNodes(),
req.subjectId(),
- req.taskNameHash());
+ req.taskNameHash(),
+ single);
tx.writeVersion(req.writeVersion());
@@ -1138,7 +1137,7 @@ public class IgniteTxHandler {
tx.transactionNodes(req.transactionNodes());
}
- if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {
+ if (!tx.isSystemInvalidate()) {
int idx = 0;
for (IgniteTxEntry entry : req.writes()) {
@@ -1236,7 +1235,6 @@ public class IgniteTxHandler {
ldr,
nodeId,
req.nearNodeId(),
- req.threadId(),
req.version(),
null,
req.system(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
new file mode 100644
index 0000000..5f48469
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
+ /** */
+ private GridCacheContext cacheCtx;
+
+ /** */
+ private IgniteTxEntry entry;
+
+ /** */
+ private boolean init;
+
+ /** {@inheritDoc} */
+ @Override public void addActiveCache(GridCacheContext ctx, IgniteTxLocalAdapter tx)
+ throws IgniteCheckedException {
+ assert cacheCtx == null : "Cache already set [cur=" + cacheCtx.name() + ", new=" + ctx.name() + ']';
+
+ this.cacheCtx = ctx;
+
+ tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled());
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
+ return cacheCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Integer firstCacheId() {
+ return cacheCtx != null ? cacheCtx.cacheId() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void awaitLastFut(GridCacheSharedContext ctx) {
+ if (cacheCtx == null)
+ return;
+
+ cacheCtx.cache().awaitLastFut();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean implicitSingle() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut) {
+ if (cacheCtx == null)
+ return null;
+
+ Throwable err = topFut.validateCache(cacheCtx);
+
+ if (err != null) {
+ return new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+ U.maskName(cacheCtx.name()));
+ }
+
+ if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) {
+ return new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " +
+ "partition nodes left the grid): " + cacheCtx.name());
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean sync(GridCacheSharedContext cctx) {
+ return cacheCtx != null && cacheCtx.config().getWriteSynchronizationMode() == FULL_SYNC;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
+ return cacheCtx != null && cacheCtx.isNear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) {
+ if (cacheCtx == null || cacheCtx.isLocal())
+ return cctx.exchange().lastTopologyFuture();
+
+ cacheCtx.topology().readLock();
+
+ if (cacheCtx.topology().stopping()) {
+ fut.onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cacheCtx.name()));
+
+ return null;
+ }
+
+ return cacheCtx.topology().topologyVersionFuture();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void topologyReadUnlock(GridCacheSharedContext cctx) {
+ if (cacheCtx == null || cacheCtx.isLocal())
+ return;
+
+ cacheCtx.topology().readUnlock();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+ if (cacheCtx == null)
+ return false;
+
+ CacheStoreManager store = cacheCtx.store();
+
+ return store.configured();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
+ if (cacheCtx == null)
+ return null;
+
+ CacheStoreManager store = cacheCtx.store();
+
+ if (store.configured())
+ return Collections.singleton(store);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
+ if (cacheCtx != null)
+ onTxEnd(cacheCtx, tx, commit);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxEntry entry(IgniteTxKey key) {
+ if (entry != null && entry.txKey().equals(key))
+ return entry;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasWriteKey(IgniteTxKey key) {
+ return entry != null && entry.txKey().equals(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<IgniteTxKey> readSet() {
+ return Collections.emptySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<IgniteTxKey> writeSet() {
+ return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> writeEntries() {
+ return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> readEntries() {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
+ return entry != null ? Collections.singletonMap(entry.txKey(), entry) :
+ Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() {
+ return Collections.emptyMap();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return entry == null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> allEntries() {
+ return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean init(int txSize) {
+ if (!init) {
+ init = true;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean initialized() {
+ return init;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addEntry(IgniteTxEntry entry) {
+ assert this.entry == null : "Entry already set [cur=" + this.entry + ", new=" + entry + ']';
+
+ this.entry = entry;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void seal() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxEntry singleWrite() {
+ return entry;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
+ }
+}