You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/01 11:24:10 UTC
[01/12] ignite git commit: Proper initialization of utility and
marshalling pools.
Repository: ignite
Updated Branches:
refs/heads/ignite-2224 eccbd0d80 -> 21567fab3
Proper initialization of utility and marshalling pools.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2e5638d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2e5638d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2e5638d
Branch: refs/heads/ignite-2224
Commit: c2e5638d01fbe44f90878d1aee6bf4979a935a7c
Parents: a34d705
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jan 29 15:51:44 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Jan 29 15:51:44 2016 +0300
----------------------------------------------------------------------
.../src/main/java/org/apache/ignite/internal/IgnitionEx.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2e5638d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 5153fb3..8f23b05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1662,7 +1662,7 @@ public class IgnitionEx {
"utility",
cfg.getGridName(),
myCfg.getUtilityCacheThreadPoolSize(),
- DFLT_SYSTEM_MAX_THREAD_CNT,
+ myCfg.getUtilityCacheThreadPoolSize(),
myCfg.getUtilityCacheKeepAliveTime(),
new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
@@ -1670,7 +1670,7 @@ public class IgnitionEx {
"marshaller-cache",
cfg.getGridName(),
myCfg.getMarshallerCacheThreadPoolSize(),
- DFLT_SYSTEM_MAX_THREAD_CNT,
+ myCfg.getMarshallerCacheThreadPoolSize(),
myCfg.getMarshallerCacheKeepAliveTime(),
new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
[12/12] ignite git commit: ignite-2224 getEntry method inside
transaction.
Posted by sb...@apache.org.
ignite-2224 getEntry method inside transaction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21567fab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21567fab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21567fab
Branch: refs/heads/ignite-2224
Commit: 21567fab3afc5c7f2fc7bb4204cf75f2328511d0
Parents: 4ede58b
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 1 13:23:13 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 1 13:23:13 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheEntryImplEx.java | 14 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 4 +-
...arOptimisticSerializableTxPrepareFuture.java | 2 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 4 +
.../GridNearPessimisticTxPrepareFuture.java | 2 +
.../cache/transactions/IgniteTxEntry.java | 32 +-
.../transactions/IgniteTxLocalAdapter.java | 50 +-
.../cache/transactions/IgniteTxManager.java | 2 +-
.../cache/CacheGetEntryAbstractSeltTest.java | 525 ------------
.../cache/CacheGetEntryAbstractTest.java | 803 +++++++++++++++++++
...GetEntryOptimisticReadCommittedSeltTest.java | 5 +-
...etEntryOptimisticRepeatableReadSeltTest.java | 5 +-
...eGetEntryOptimisticSerializableSeltTest.java | 5 +-
...etEntryPessimisticReadCommittedSeltTest.java | 5 +-
...tEntryPessimisticRepeatableReadSeltTest.java | 5 +-
...GetEntryPessimisticSerializableSeltTest.java | 5 +-
16 files changed, 904 insertions(+), 564 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
index 1c7111a..af926c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
@@ -21,9 +21,13 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+
/**
*
*/
@@ -54,6 +58,14 @@ public class CacheEntryImplEx<K, V> extends CacheEntryImpl<K, V> implements Cach
/** {@inheritDoc} */
public GridCacheVersion version() {
+ if (ver == GET_ENTRY_INVALID_VER_AFTER_GET) {
+ throw new IgniteException("Impossible to get entry version after " +
+ "get() inside OPTIMISTIC REPEATABLE_READ transaction. Use only getEntry() or getEntries() inside " +
+ "OPTIMISTIC REPEATABLE_READ transaction to solve this problem.");
+ }
+ else if (ver == GET_ENTRY_INVALID_VER_UPDATED)
+ throw new IgniteException("Impossible to get version for entry updated in transaction.");
+
return ver;
}
@@ -81,7 +93,7 @@ public class CacheEntryImplEx<K, V> extends CacheEntryImpl<K, V> implements Cach
String res = "CacheEntry [key=" + getKey() +
", val=" + getValue();
- if (ver != null) {
+ if (ver != null && ver != GET_ENTRY_INVALID_VER_AFTER_GET && ver != GET_ENTRY_INVALID_VER_UPDATED) {
res += ", topVer=" + ver.topologyVersion() +
", nodeOrder=" + ver.nodeOrder() +
", order=" + ver.order() +
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d8b2f37..41b28d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -946,7 +946,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (retVal ||
!F.isEmpty(e.entryProcessors()) ||
!F.isEmpty(e.filters()) ||
- e.serializableReadVersion() != null) {
+ e.entryReadVersion() != null) {
if (map == null)
map = new HashMap<>();
@@ -1013,7 +1013,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
throws IgniteCheckedException {
try {
for (IgniteTxEntry entry : entries) {
- GridCacheVersion serReadVer = entry.serializableReadVersion();
+ GridCacheVersion serReadVer = entry.entryReadVersion();
if (serReadVer != null) {
entry.cached().unswap();
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 4f9f227..52ebfc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -107,7 +107,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
if (txEntry != null) {
if (entry.context().isLocal()) {
- GridCacheVersion serReadVer = txEntry.serializableReadVersion();
+ GridCacheVersion serReadVer = txEntry.entryReadVersion();
if (serReadVer != null) {
GridCacheContext ctx = entry.context();
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index bae0327..b968e57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -279,6 +279,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
private void prepareSingle(IgniteTxEntry write, boolean topLocked) {
+ write.clearEntryReadVersion();
+
AffinityTopologyVersion topVer = tx.topologyVersion();
assert topVer.topologyVersion() > 0;
@@ -339,6 +341,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
for (IgniteTxEntry write : writes) {
+ write.clearEntryReadVersion();
+
GridDistributedTxMapping updated = map(write, topVer, cur, topLocked);
if (cur != updated) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 8170008..615a92b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -176,6 +176,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
txMapping = new GridDhtTxMapping();
for (IgniteTxEntry txEntry : tx.allEntries()) {
+ txEntry.clearEntryReadVersion();
+
GridCacheContext cacheCtx = txEntry.context();
List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index f731975..8b871a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
@@ -73,6 +74,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** Dummy version for any existing entry read in SERIALIZABLE transaction. */
public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1);
+ /** */
+ public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 0, 2);
+
+ /** */
+ public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 0, 3);
+
/** Prepared flag updater. */
private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared");
@@ -918,13 +925,30 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
- * @param serReadVer Read version for serializable transaction.
+ * Gets stored entry version. Version is stored for all entries in serializable transaction or
+ * when value is read using {@link IgniteCache#getEntry(Object)} method.
+ *
+ * @return Entry version.
+ */
+ @Nullable public GridCacheVersion entryReadVersion() {
+ return serReadVer;
+ }
+
+ /**
+ * @param ver Entry version.
*/
- public void serializableReadVersion(GridCacheVersion serReadVer) {
+ public void entryReadVersion(GridCacheVersion ver) {
assert this.serReadVer == null;
- assert serReadVer != null;
+ assert ver != null;
- this.serReadVer = serReadVer;
+ this.serReadVer = ver;
+ }
+
+ /**
+ * Clears recorded read version, should be done before starting commit of not serializable/optimistic transaction.
+ */
+ public void clearEntryReadVersion() {
+ serReadVer = null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 298413b..a999358 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -1409,25 +1410,34 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (needVer) {
if (txEntry.op() != READ)
- ver = xidVersion();
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
else {
- if (serializable()) {
- ver = txEntry.serializableReadVersion();
+ ver = txEntry.entryReadVersion();
- assert ver != null; // TODO: value should not be null;
- }
- else if (optimistic()) {
- assert isolation() == TransactionIsolation.REPEATABLE_READ;
+ if (ver == null && pessimistic()) {
+ while (true) {
+ try {
+ GridCacheEntryEx cached = txEntry.cached();
+
+ ver = cached.isNear() ?
+ ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
- throw new IgniteCheckedException(
- "Impossible to getEntry() or getEntries() after get() at " +
- "OPTIMISTIC REPEATABLE_READ case. " +
- "Use only getEntry() or getEntries() to solve the problem. ");
+ break;
+ }
+ catch (GridCacheEntryRemovedException rmvdErr) {
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+ }
+ }
}
- else {
- assert ver != null; // TODO: value should not be null;
+
+ if (ver == null) {
+ assert optimistic() && repeatableRead() : this;
+
+ ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
}
}
+
+ assert ver != null;
}
cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, ver);
@@ -1601,7 +1611,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (needReadVer) {
assert readVer != null;
- txEntry.serializableReadVersion(readVer);
+ txEntry.entryReadVersion(readVer);
}
}
}
@@ -1751,7 +1761,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (needReadVer) {
assert loadVer != null;
- txEntry.serializableReadVersion(loadVer);
+ txEntry.entryReadVersion(loadVer);
}
if (visibleVal != null) {
@@ -1771,6 +1781,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
final GridCacheContext cacheCtx,
Collection<KeyCacheObject> keys,
@@ -1915,6 +1926,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
deserializeBinary,
false,
readVer);
+
+ if (readVer != null)
+ txEntry.entryReadVersion(readVer);
}
// Even though we bring the value back from lock acquisition,
@@ -2399,7 +2413,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (needReadVer) {
assert loadVer != null;
- e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+ e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
}
if (singleRmv) {
@@ -2592,7 +2606,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (needReadVer) {
assert readVer != null;
- txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
}
}
@@ -2645,7 +2659,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (needReadVer) {
assert readVer != null;
- txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+ txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
}
if (retval && !transform)
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 7a3b8ff..643ba2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1419,7 +1419,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
assert !entry1.detached() : "Expected non-detached entry for near transaction " +
"[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']';
- GridCacheVersion serReadVer = txEntry1.serializableReadVersion();
+ GridCacheVersion serReadVer = txEntry1.entryReadVersion();
assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1;
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractSeltTest.java
deleted file mode 100644
index 2e94cbc..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractSeltTest.java
+++ /dev/null
@@ -1,525 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.cache.CacheEntry;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionConcurrency;
-import org.apache.ignite.transactions.TransactionIsolation;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-
-/**
- * Test getEntry and getEntries methods.
- */
-public abstract class CacheGetEntryAbstractSeltTest extends GridCacheAbstractSelfTest {
-
- @Override protected int gridCount() {
- return 3;
- }
-
- abstract protected TransactionConcurrency concurrency();
-
- abstract protected TransactionIsolation isolation();
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setMarshaller(null);
-
- return cfg;
- }
-
- /** */
- public void testNear() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setCacheMode(CacheMode.PARTITIONED);
- cfg.setName("near");
- cfg.setNearConfiguration(new NearCacheConfiguration());
-
- test(cfg);
- }
-
- /** */
- public void testNearTransactional() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setCacheMode(CacheMode.PARTITIONED);
- cfg.setAtomicityMode(TRANSACTIONAL);
- cfg.setName("nearT");
- cfg.setNearConfiguration(new NearCacheConfiguration());
-
- test(cfg);
- }
-
- /** */
- public void testPartitioned() {
- CacheConfiguration cfg = new CacheConfiguration();
- cfg.setCacheMode(CacheMode.PARTITIONED);
- cfg.setName("partitioned");
-
- test(cfg);
- }
-
- /** */
- public void testPartitionedTransactional() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setCacheMode(CacheMode.PARTITIONED);
- cfg.setAtomicityMode(TRANSACTIONAL);
- cfg.setName("partitionedT");
-
- test(cfg);
- }
-
- /** */
- public void testLocal() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setCacheMode(CacheMode.LOCAL);
- cfg.setName("local");
-
- test(cfg);
- }
-
- /** */
- public void testLocalTransactional() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setCacheMode(CacheMode.LOCAL);
- cfg.setAtomicityMode(TRANSACTIONAL);
- cfg.setName("localT");
-
- test(cfg);
- }
-
- /** */
- public void testReplicated() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setCacheMode(CacheMode.REPLICATED);
- cfg.setName("replicated");
-
- test(cfg);
- }
-
- /** */
- public void testReplicatedTransactional() {
- CacheConfiguration cfg = new CacheConfiguration();
-
- cfg.setCacheMode(CacheMode.REPLICATED);
- cfg.setAtomicityMode(TRANSACTIONAL);
- cfg.setName("replicatedT");
-
- test(cfg);
- }
-
- /** */
- private void test(CacheConfiguration cfg) {
- test(cfg, true);
- test(cfg, false);
- }
-
- /** */
- private void test(CacheConfiguration cfg, boolean oneEntry) {
- IgniteCache<Integer, TestValue> cache = grid(0).createCache(cfg);
- try {
- init(cache);
-
- test(cache, null, null, null, oneEntry);
-
- if (cfg.getAtomicityMode() == TRANSACTIONAL) {
- TransactionConcurrency txConcurrency = concurrency();
- TransactionIsolation txIsolation = isolation();
- try (Transaction tx = grid(0).transactions().txStart(txConcurrency, txIsolation, 100000, 1000)) {
- initTx(cache);
-
- test(cache, txConcurrency, txIsolation, tx, oneEntry);
-
- tx.commit();
- }
- }
- }
- finally {
- cache.destroy();
- }
- }
-
- private Set<Integer> getKeys(int base) {
- int start = 0;
- int finish = 100;
-
- Set<Integer> keys = new HashSet<>(finish - start);
-
- for (int i = base + start; i < base + finish; ++i)
- keys.add(i);
-
- return keys;
- }
-
- private Set<Integer> createdBeforeTxKeys() {
- return getKeys(0);
- }
-
- private Set<Integer> createdBeforeTxWithBinaryKeys() {
- return getKeys(1_000);
- }
-
- private Set<Integer> createdBeforeTxKeys2() {
- return getKeys(2_000);
- }
-
- private Set<Integer> createdBeforeTxWithBinaryKeys2() {
- return getKeys(3_000);
- }
-
- private Set<Integer> createdBeforeTxKeys3() {
- return getKeys(4_000);
- }
-
- private Set<Integer> createdBeforeTxWithBinaryKeys3() {
- return getKeys(5_000);
- }
-
- private Set<Integer> removedBeforeTxKeys() {
- return getKeys(6_000);
- }
-
- private Set<Integer> removedBeforeTxWithBinaryKeys() {
- return getKeys(7_000);
- }
-
- private Set<Integer> createdAtTxKeys() {
- return getKeys(8_000);
- }
-
- private Set<Integer> createdAtTxWithBinaryKeys() {
- return getKeys(9_000);
- }
-
- private Set<Integer> removedAtTxKeys() {
- return getKeys(10_000);
- }
-
- private Set<Integer> removedAtTxWithBinaryKeys() {
- return getKeys(11_000);
- }
-
- /** */
- private void init(IgniteCache<Integer, TestValue> cache) {
- Set<Integer> keys = new HashSet<>();
-
- keys.addAll(createdBeforeTxKeys());
- keys.addAll(createdBeforeTxWithBinaryKeys());
- keys.addAll(createdBeforeTxKeys2());
- keys.addAll(createdBeforeTxWithBinaryKeys2());
- keys.addAll(createdBeforeTxKeys3());
- keys.addAll(createdBeforeTxWithBinaryKeys3());
- keys.addAll(removedBeforeTxKeys());
- keys.addAll(removedBeforeTxWithBinaryKeys());
- keys.addAll(removedAtTxKeys());
- keys.addAll(removedAtTxWithBinaryKeys());
-
- for (int i : keys)
- cache.put(i, new TestValue(i));
-
- for (int i : removedBeforeTxKeys())
- cache.remove(i);
-
- for (int i : removedBeforeTxWithBinaryKeys())
- cache.remove(i);
- }
-
- /** */
- private void initTx(IgniteCache<Integer, TestValue> cache) {
- for (int i : createdAtTxKeys())
- cache.put(i, new TestValue(i));
-
- for (int i : createdAtTxWithBinaryKeys())
- cache.put(i, new TestValue(i));
-
- for (int i : removedAtTxKeys())
- cache.remove(i);
-
- for (int i : removedAtTxWithBinaryKeys())
- cache.remove(i);
- }
-
- /** */
- private void compareVersionWithPrimaryNode(CacheEntry<Integer, ?> e, IgniteCache<Integer, TestValue> cache) {
- CacheConfiguration cfg = cache.getConfiguration(CacheConfiguration.class);
-
- if (cfg.getCacheMode() != CacheMode.LOCAL) {
- Ignite prim = primaryNode(e.getKey(), cache.getName());
-
- GridCacheAdapter<Object, Object> cacheAdapter = ((IgniteKernal)prim).internalCache(cache.getName());
-
- if (cfg.getNearConfiguration() != null)
- cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht();
-
- IgniteCacheObjectProcessor cacheObjects = cacheAdapter.context().cacheObjects();
-
- CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext();
-
- GridCacheMapEntry me = cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(
- cacheObjCtx, e.getKey(), true));
-
- try {
- assertEquals(me.version(), e.version());
- }
- catch (GridCacheEntryRemovedException ex) {
- throw new RuntimeException(ex);
- }
- }
- }
-
- private void checkData(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry, GridCacheVersion txVer) {
- if (oneEntry) {
- CacheEntry<Integer, TestValue> e = cache.getEntry(i);
-
- if (txVer != null)
- assertEquals(txVer, e.version());
- else
- compareVersionWithPrimaryNode(e, cache);
-
- assertEquals(e.getValue().val, i);
- }
- else {
- Set<Integer> set = new HashSet<>();
-
- for (int j = 0; j < 10; j++)
- set.add(i + j);
-
- Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set);
-
- for (CacheEntry<Integer, TestValue> e : es) {
- if (txVer != null)
- assertEquals(txVer, e.version());
- else
- compareVersionWithPrimaryNode(e, cache);
-
- assertEquals((Integer)e.getValue().val, e.getKey());
-
- assertTrue(set.contains(e.getValue().val));
- }
- }
- }
-
- private void checkBinaryData(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry,
- GridCacheVersion txVer) {
- IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
-
- if (oneEntry) {
- CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
-
- if (txVer != null)
- assertEquals(txVer, e.version());
- else
- compareVersionWithPrimaryNode(e, cache);
-
- assertEquals(((TestValue)e.getValue().deserialize()).val, i);
- }
- else {
- Set<Integer> set = new HashSet<>();
-
- for (int j = 0; j < 10; j++)
- set.add(i + j);
-
- Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set);
-
- for (CacheEntry<Integer, BinaryObject> e : es) {
- if (txVer != null)
- assertEquals(txVer, e.version());
- else
- compareVersionWithPrimaryNode(e, cache);
-
- TestValue tv = e.getValue().deserialize();
-
- assertEquals((Integer)tv.val, e.getKey());
-
- assertTrue(set.contains((tv).val));
- }
- }
- }
-
- private void checkRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
- if (oneEntry) {
- CacheEntry<Integer, TestValue> e = cache.getEntry(i);
-
- assertNull(e);
- }
- else {
- Set<Integer> set = new HashSet<>();
-
- for (int j = 0; j < 10; j++)
- set.add(i + j);
-
- Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set);
-
- assertTrue(es.isEmpty());
- }
- }
-
- private void checkBinaryRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
- IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
-
- if (oneEntry) {
- CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
-
- assertNull(e);
- }
- else {
- Set<Integer> set = new HashSet<>();
-
- for (int j = 0; j < 10; j++)
- set.add(i + j);
-
- Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set);
-
- assertTrue(es.isEmpty());
- }
- }
-
- /** */
- private void test(IgniteCache<Integer, TestValue> cache,
- TransactionConcurrency txConcurrency,
- TransactionIsolation txIsolation,
- Transaction tx,
- boolean oneEntry) {
- if (tx == null) {
- for (int i : createdBeforeTxKeys()) {
- checkData(cache, i, oneEntry, null);
- }
-
- for (int i : createdBeforeTxWithBinaryKeys()) {
- checkBinaryData(cache, i, oneEntry, null);
- }
-
- for (int i : removedBeforeTxKeys()) {
- checkRemoved(cache, i, oneEntry);
- }
-
- for (int i : removedBeforeTxWithBinaryKeys()) {
- checkBinaryRemoved(cache, i, oneEntry);
- }
- }
- else {
- GridCacheVersion txVer = ((TransactionProxyImpl)tx).tx().xidVersion();
-
- for (int i : createdBeforeTxKeys2()) {
- checkData(cache, i, oneEntry, null);
- checkData(cache, i, oneEntry, null);
- }
-
- for (int i : createdBeforeTxWithBinaryKeys2()) {
- checkBinaryData(cache, i, oneEntry, null);
- checkBinaryData(cache, i, oneEntry, null);
- }
-
- try {
- for (int i : createdBeforeTxKeys3()) {
- cache.get(i);
-
- checkData(cache, i, oneEntry, null);
- }
-
- for (int i : createdBeforeTxWithBinaryKeys3()) {
- cache.get(i);
-
- checkBinaryData(cache, i, oneEntry, null);
- }
-
- assertFalse(txIsolation == TransactionIsolation.REPEATABLE_READ &&
- txConcurrency == TransactionConcurrency.OPTIMISTIC);
- }
- catch (Exception e) {
- assertTrue(txIsolation == TransactionIsolation.REPEATABLE_READ &&
- txConcurrency == TransactionConcurrency.OPTIMISTIC);
- }
-
- for (int i : createdAtTxKeys()) {
- checkData(cache, i, oneEntry, txVer);
- }
-
- for (int i : createdAtTxWithBinaryKeys()) {
- checkBinaryData(cache, i, oneEntry, txVer);
- }
-
- for (int i : removedBeforeTxKeys()) {
- checkRemoved(cache, i, oneEntry);
- }
-
- for (int i : removedBeforeTxWithBinaryKeys()) {
- checkBinaryRemoved(cache, i, oneEntry);
- }
- for (int i : removedAtTxKeys()) {
- checkRemoved(cache, i, oneEntry);
- }
-
- for (int i : removedAtTxWithBinaryKeys()) {
- checkBinaryRemoved(cache, i, oneEntry);
- }
- }
- }
-
- /**
- *
- */
- private static class TestValue implements Serializable {
- /** */
- private int val;
-
- /**
- * @param val Value.
- */
- public TestValue(int val) {
- this.val = val;
- }
-
- /**
- * @return Value.
- */
- public int value() {
- return val;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(TestValue.class, this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
new file mode 100644
index 0000000..c0ba42c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTest {
+ /** */
+ private static final String UPDATED_ENTRY_ERR = "Impossible to get version for entry updated in transaction";
+
+ /** */
+ private static final String ENTRY_AFTER_GET_ERR = "Impossible to get entry version after get()";
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /**
+ * @return Transaction concurrency.
+ */
+ abstract protected TransactionConcurrency concurrency();
+
+ /**
+ *
+ * @return Transaction isolation.
+ */
+ abstract protected TransactionIsolation isolation();
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(null);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNear() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setName("near");
+ cfg.setNearConfiguration(new NearCacheConfiguration());
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNearTransactional() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setName("nearT");
+ cfg.setNearConfiguration(new NearCacheConfiguration());
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitioned() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setName("partitioned");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedTransactional() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(PARTITIONED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setName("partitionedT");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocal() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(LOCAL);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setName("local");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocalTransactional() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(LOCAL);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setName("localT");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicated() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(REPLICATED);
+ cfg.setAtomicityMode(ATOMIC);
+ cfg.setName("replicated");
+
+ test(cfg);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedTransactional() throws Exception {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setWriteSynchronizationMode(FULL_SYNC);
+ cfg.setCacheMode(REPLICATED);
+ cfg.setAtomicityMode(TRANSACTIONAL);
+ cfg.setName("replicatedT");
+
+ test(cfg);
+ }
+
+ /**
+ * @param cfg Cache configuration.
+ * @throws Exception If failed.
+ */
+ private void test(CacheConfiguration cfg) throws Exception {
+ test(cfg, true);
+
+ test(cfg, false);
+ }
+
+ /**
+ * @param cfg Cache configuration.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @throws Exception If failed.
+ */
+ private void test(CacheConfiguration cfg, final boolean oneEntry) throws Exception {
+ final IgniteCache<Integer, TestValue> cache = grid(0).createCache(cfg);
+
+ try {
+ init(cache);
+
+ test(cache, null, null, null, oneEntry);
+
+ if (cfg.getAtomicityMode() == TRANSACTIONAL) {
+ TransactionConcurrency txConcurrency = concurrency();
+ TransactionIsolation txIsolation = isolation();
+
+ try (Transaction tx = grid(0).transactions().txStart(txConcurrency, txIsolation)) {
+ initTx(cache);
+
+ test(cache, txConcurrency, txIsolation, tx, oneEntry);
+
+ tx.commit();
+ }
+
+ testConcurrentTx(cache, OPTIMISTIC, REPEATABLE_READ, oneEntry);
+ testConcurrentTx(cache, OPTIMISTIC, READ_COMMITTED, oneEntry);
+
+ testConcurrentTx(cache, PESSIMISTIC, REPEATABLE_READ, oneEntry);
+ testConcurrentTx(cache, PESSIMISTIC, READ_COMMITTED, oneEntry);
+ }
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param txConcurrency Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @throws Exception If failed.
+ */
+ private void testConcurrentTx(final IgniteCache<Integer, TestValue> cache,
+ final TransactionConcurrency txConcurrency,
+ final TransactionIsolation txIsolation,
+ final boolean oneEntry) throws Exception {
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgniteTransactions txs = grid(0).transactions();
+
+ long stopTime = System.currentTimeMillis() + 3000;
+
+ while (System.currentTimeMillis() < stopTime) {
+ Set<Integer> keys = new LinkedHashSet<>();
+
+ for (int i = 0; i < 100; i++)
+ keys.add(i);
+
+ try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
+ if (oneEntry) {
+ for (int i = 0; i < 100; i++)
+ cache.getEntry(i);
+ }
+ else
+ cache.getEntries(keys);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, new TestValue(i));
+
+ tx.commit();
+ }
+ }
+
+ return null;
+ }
+ }, 10, "tx-thread");
+ }
+
+ /**
+ * @param base Start value.
+ * @return Keys.
+ */
+ private Set<Integer> getKeys(int base) {
+ int start = 0;
+ int finish = 100;
+
+ Set<Integer> keys = new HashSet<>(finish - start);
+
+ for (int i = base + start; i < base + finish; ++i)
+ keys.add(i);
+
+ return keys;
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxKeys() {
+ return getKeys(0);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxWithBinaryKeys() {
+ return getKeys(1_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxKeys2() {
+ return getKeys(2_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxWithBinaryKeys2() {
+ return getKeys(3_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxKeys3() {
+ return getKeys(4_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdBeforeTxWithBinaryKeys3() {
+ return getKeys(5_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> removedBeforeTxKeys() {
+ return getKeys(6_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> removedBeforeTxWithBinaryKeys() {
+ return getKeys(7_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdAtTxKeys() {
+ return getKeys(8_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> createdAtTxWithBinaryKeys() {
+ return getKeys(9_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> removedAtTxKeys() {
+ return getKeys(10_000);
+ }
+
+ /**
+ * @return Keys.
+ */
+ private Set<Integer> removedAtTxWithBinaryKeys() {
+ return getKeys(11_000);
+ }
+
+ /**
+ * @param cache Cacge.
+ */
+ private void init(IgniteCache<Integer, TestValue> cache) {
+ Set<Integer> keys = new HashSet<>();
+
+ keys.addAll(createdBeforeTxKeys());
+ keys.addAll(createdBeforeTxWithBinaryKeys());
+ keys.addAll(createdBeforeTxKeys2());
+ keys.addAll(createdBeforeTxWithBinaryKeys2());
+ keys.addAll(createdBeforeTxKeys3());
+ keys.addAll(createdBeforeTxWithBinaryKeys3());
+ keys.addAll(removedBeforeTxKeys());
+ keys.addAll(removedBeforeTxWithBinaryKeys());
+ keys.addAll(removedAtTxKeys());
+ keys.addAll(removedAtTxWithBinaryKeys());
+
+ for (int i : keys)
+ cache.put(i, new TestValue(i));
+
+ for (int i : removedBeforeTxKeys())
+ cache.remove(i);
+
+ for (int i : removedBeforeTxWithBinaryKeys())
+ cache.remove(i);
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void initTx(IgniteCache<Integer, TestValue> cache) {
+ for (int i : createdAtTxKeys())
+ cache.put(i, new TestValue(i));
+
+ for (int i : createdAtTxWithBinaryKeys())
+ cache.put(i, new TestValue(i));
+
+ for (int i : removedAtTxKeys())
+ cache.remove(i);
+
+ for (int i : removedAtTxWithBinaryKeys())
+ cache.remove(i);
+ }
+
+ /**
+ * @param e Entry.
+ * @param cache Cache.
+ * @throws Exception If failed.
+ */
+ private void compareVersionWithPrimaryNode(CacheEntry<Integer, ?> e, IgniteCache<Integer, TestValue> cache)
+ throws Exception {
+ CacheConfiguration cfg = cache.getConfiguration(CacheConfiguration.class);
+
+ if (cfg.getCacheMode() != LOCAL) {
+ Ignite prim = primaryNode(e.getKey(), cache.getName());
+
+ GridCacheAdapter<Object, Object> cacheAdapter = ((IgniteKernal)prim).internalCache(cache.getName());
+
+ if (cfg.getNearConfiguration() != null)
+ cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht();
+
+ IgniteCacheObjectProcessor cacheObjects = cacheAdapter.context().cacheObjects();
+
+ CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext();
+
+ GridCacheMapEntry mapEntry = cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(
+ cacheObjCtx, e.getKey(), true));
+
+ assertNotNull("No entry for key: " + e.getKey(), mapEntry);
+ assertEquals(mapEntry.version(), e.version());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param i Key.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @param getVerErr Not null error if entry version access should fail.
+ * @param expKeys Expected keys with values.
+ * @throws Exception If failed.
+ */
+ private void checkData(IgniteCache<Integer, TestValue> cache,
+ int i,
+ boolean oneEntry,
+ @Nullable String getVerErr,
+ Set<Integer> expKeys) throws Exception {
+ if (oneEntry) {
+ final CacheEntry<Integer, TestValue> e = cache.getEntry(i);
+
+ if (getVerErr == null)
+ compareVersionWithPrimaryNode(e, cache);
+ else {
+ Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ e.version();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+ }
+
+ assertEquals(e.getValue().val, i);
+ }
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ int expCnt = 0;
+
+ for (int j = 0; j < 10; j++) {
+ Integer key = i + j;
+
+ set.add(key);
+
+ if (expKeys.contains(key))
+ expCnt++;
+ }
+
+ Collection<CacheEntry<Integer, TestValue>> entries = cache.getEntries(set);
+
+ assertEquals(expCnt, entries.size());
+
+ for (final CacheEntry<Integer, TestValue> e : entries) {
+ if (getVerErr == null)
+ compareVersionWithPrimaryNode(e, cache);
+ else {
+ Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ e.version();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+ }
+
+ assertEquals((Integer)e.getValue().val, e.getKey());
+
+ assertTrue(set.contains(e.getValue().val));
+ }
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param i Key.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @param getVerErr Not null error if entry version access should fail.
+ * @param expKeys Expected keys with values.
+ * @throws Exception If failed.
+ */
+ private void checkBinaryData(IgniteCache<Integer, TestValue> cache,
+ int i,
+ boolean oneEntry,
+ @Nullable String getVerErr,
+ Set<Integer> expKeys) throws Exception {
+ IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
+
+ if (oneEntry) {
+ final CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
+
+ if (getVerErr == null)
+ compareVersionWithPrimaryNode(e, cache);
+ else {
+ Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ e.version();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+ }
+
+ assertEquals(((TestValue)e.getValue().deserialize()).val, i);
+ }
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ int expCnt = 0;
+
+ for (int j = 0; j < 10; j++) {
+ Integer key = i + j;
+
+ set.add(key);
+
+ if (expKeys.contains(key))
+ expCnt++;
+ }
+
+ Collection<CacheEntry<Integer, BinaryObject>> entries = cacheB.getEntries(set);
+
+ assertEquals(expCnt, entries.size());
+
+ for (final CacheEntry<Integer, BinaryObject> e : entries) {
+ if (getVerErr == null)
+ compareVersionWithPrimaryNode(e, cache);
+ else {
+ Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ e.version();
+
+ return null;
+ }
+ }, IgniteException.class, null);
+
+ assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+ }
+
+ TestValue tv = e.getValue().deserialize();
+
+ assertEquals((Integer)tv.val, e.getKey());
+
+ assertTrue(set.contains((tv).val));
+ }
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param i Key.
+ * @param oneEntry If {@code true} then single entry is tested.
+ */
+ private void checkRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
+ if (oneEntry) {
+ CacheEntry<Integer, TestValue> e = cache.getEntry(i);
+
+ assertNull(e);
+ }
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(i + j);
+
+ Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set);
+
+ assertTrue(es.isEmpty());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param i Key.
+ * @param oneEntry If {@code true} then single entry is tested.
+ */
+ private void checkBinaryRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
+ IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
+
+ if (oneEntry) {
+ CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
+
+ assertNull(e);
+ }
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(i + j);
+
+ Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set);
+
+ assertTrue(es.isEmpty());
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ * @param txConcurrency Transaction concurrency.
+ * @param txIsolation Transaction isolation.
+ * @param tx Transaction.
+ * @param oneEntry If {@code true} then single entry is tested.
+ * @throws Exception If failed.
+ */
+ private void test(IgniteCache<Integer, TestValue> cache,
+ TransactionConcurrency txConcurrency,
+ TransactionIsolation txIsolation,
+ Transaction tx,
+ boolean oneEntry) throws Exception {
+ if (tx == null) {
+ Set<Integer> keys = createdBeforeTxKeys();
+
+ for (int i : keys)
+ checkData(cache, i, oneEntry, null, keys);
+
+ keys = createdBeforeTxWithBinaryKeys();
+
+ for (int i : keys)
+ checkBinaryData(cache, i, oneEntry, null, keys);
+
+ for (int i : removedBeforeTxKeys())
+ checkRemoved(cache, i, oneEntry);
+
+ for (int i : removedBeforeTxWithBinaryKeys())
+ checkBinaryRemoved(cache, i, oneEntry);
+ }
+ else {
+ Set<Integer> keys = createdBeforeTxKeys2();
+
+ for (int i : keys) {
+ checkData(cache, i, oneEntry, null, keys);
+ checkData(cache, i, oneEntry, null, keys);
+ }
+
+ keys = createdBeforeTxWithBinaryKeys2();
+
+ for (int i : keys) {
+ checkBinaryData(cache, i, oneEntry, null, keys);
+ checkBinaryData(cache, i, oneEntry, null, keys);
+ }
+
+ String verGetErr = null;
+
+ if (txConcurrency == OPTIMISTIC && txIsolation == REPEATABLE_READ)
+ verGetErr = ENTRY_AFTER_GET_ERR;
+
+ keys = createdBeforeTxKeys3();
+
+ for (int i : keys) {
+ if (oneEntry)
+ cache.get(i);
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(i + j);
+
+ cache.getAll(set);
+ }
+
+ checkData(cache, i, oneEntry, verGetErr, keys);
+ }
+
+ keys = createdBeforeTxWithBinaryKeys3();
+
+ for (int i : keys) {
+ if (oneEntry)
+ cache.get(i);
+ else {
+ Set<Integer> set = new HashSet<>();
+
+ for (int j = 0; j < 10; j++)
+ set.add(i + j);
+
+ cache.getAll(set);
+ }
+
+ checkBinaryData(cache, i, oneEntry, verGetErr, keys);
+ }
+
+ keys = createdAtTxKeys();
+
+ for (int i : keys)
+ checkData(cache, i, oneEntry, UPDATED_ENTRY_ERR, keys);
+
+ keys = createdAtTxWithBinaryKeys();
+
+ for (int i : keys)
+ checkBinaryData(cache, i, oneEntry, UPDATED_ENTRY_ERR, keys);
+
+ for (int i : removedBeforeTxKeys())
+ checkRemoved(cache, i, oneEntry);
+
+ for (int i : removedBeforeTxWithBinaryKeys())
+ checkBinaryRemoved(cache, i, oneEntry);
+
+ for (int i : removedAtTxKeys())
+ checkRemoved(cache, i, oneEntry);
+
+ for (int i : removedAtTxWithBinaryKeys())
+ checkBinaryRemoved(cache, i, oneEntry);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue implements Serializable {
+ /** */
+ private int val;
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(int val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public int value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
index b7cd28b..acc21df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
/**
* Test getEntry and getEntries methods.
*/
-public class CacheGetEntryOptimisticReadCommittedSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryOptimisticReadCommittedSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
@Override protected TransactionConcurrency concurrency() {
return TransactionConcurrency.OPTIMISTIC;
}
+ /** {@inheritDoc} */
@Override protected TransactionIsolation isolation() {
return TransactionIsolation.READ_COMMITTED;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
index d8453ba..6153869 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
/**
* Test getEntry and getEntries methods.
*/
-public class CacheGetEntryOptimisticRepeatableReadSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryOptimisticRepeatableReadSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
@Override protected TransactionConcurrency concurrency() {
return TransactionConcurrency.OPTIMISTIC;
}
+ /** {@inheritDoc} */
@Override protected TransactionIsolation isolation() {
return TransactionIsolation.REPEATABLE_READ;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
index 4fbe66e..6ded4a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
/**
* Test getEntry and getEntries methods.
*/
-public class CacheGetEntryOptimisticSerializableSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryOptimisticSerializableSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
@Override protected TransactionConcurrency concurrency() {
return TransactionConcurrency.OPTIMISTIC;
}
+ /** {@inheritDoc} */
@Override protected TransactionIsolation isolation() {
return TransactionIsolation.SERIALIZABLE;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
index 86d6e4e..975d271 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
/**
* Test getEntry and getEntries methods.
*/
-public class CacheGetEntryPessimisticReadCommittedSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryPessimisticReadCommittedSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
@Override protected TransactionConcurrency concurrency() {
return TransactionConcurrency.PESSIMISTIC;
}
+ /** {@inheritDoc} */
@Override protected TransactionIsolation isolation() {
return TransactionIsolation.READ_COMMITTED;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
index 7fc7aa3..dac64d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
/**
* Test getEntry and getEntries methods.
*/
-public class CacheGetEntryPessimisticRepeatableReadSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryPessimisticRepeatableReadSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
@Override protected TransactionConcurrency concurrency() {
return TransactionConcurrency.PESSIMISTIC;
}
+ /** {@inheritDoc} */
@Override protected TransactionIsolation isolation() {
return TransactionIsolation.REPEATABLE_READ;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
index da28eca..70f71ce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
/**
* Test getEntry and getEntries methods.
*/
-public class CacheGetEntryPessimisticSerializableSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryPessimisticSerializableSeltTest extends CacheGetEntryAbstractTest {
+ /** {@inheritDoc} */
@Override protected TransactionConcurrency concurrency() {
return TransactionConcurrency.PESSIMISTIC;
}
+ /** {@inheritDoc} */
@Override protected TransactionIsolation isolation() {
return TransactionIsolation.SERIALIZABLE;
}
[03/12] ignite git commit: gc
Posted by sb...@apache.org.
gc
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/861236a1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/861236a1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/861236a1
Branch: refs/heads/ignite-2224
Commit: 861236a16851317d3c580eb5a72ef8e5f8561b80
Parents: 8a8a8c1
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Jan 29 18:43:10 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Jan 29 18:43:10 2016 +0300
----------------------------------------------------------------------
.../rebalancing/GridCacheRebalancingSyncSelfTest.java | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/861236a1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 4ee080f..e4ad66b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
@@ -177,6 +178,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
}
}
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ GridTestUtils.runGC(); // Clean heap before rebalancing.
+ }
+
/**
* @throws Exception If failed.
*/
[02/12] ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/ignite into master-main
Posted by sb...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-main
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8a8a8c17
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8a8a8c17
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8a8a8c17
Branch: refs/heads/ignite-2224
Commit: 8a8a8c17602505adb4234bdd440b2ae20ee9a25e
Parents: c2e5638 d2a107b
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jan 29 15:52:38 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Jan 29 15:52:38 2016 +0300
----------------------------------------------------------------------
.../examples/datagrid/CacheAffinityExample.java | 8 +-
.../java8/datagrid/CacheAffinityExample.java | 6 +-
modules/benchmarks/pom.xml | 32 +-
.../benchmarks/jmh/JmhAbstractBenchmark.java | 150 ++++++++++
.../jmh/cache/JmhCacheAbstractBenchmark.java | 181 ++++++++++++
.../jmh/cache/JmhCachePutBenchmark.java | 124 ++++++++
.../benchmarks/jmh/cache/PutBenchmark.java | 170 -----------
.../jmh/runner/JmhIdeBenchmarkRunner.java | 232 +++++++++++++++
.../internal/client/ClientGetAffinityTask.java | 4 +-
.../java/org/apache/ignite/IgniteCluster.java | 7 +-
.../apache/ignite/cache/affinity/Affinity.java | 24 +-
.../affinity/GridAffinityProcessor.java | 60 ++--
.../cache/GridCacheAffinityManager.java | 47 ++-
.../cache/affinity/GridCacheAffinityImpl.java | 48 ++-
.../dht/preloader/GridDhtPartitionDemander.java | 4 +-
.../near/GridNearTxFinishFuture.java | 96 +++---
.../ignite/internal/GridAffinityMappedTest.java | 8 +-
.../internal/GridAffinityNoCacheSelfTest.java | 290 +++++++++++++++++++
.../internal/GridAffinityP2PSelfTest.java | 8 +-
.../ignite/internal/GridAffinitySelfTest.java | 8 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 4 +-
.../cache/GridCacheAffinityRoutingSelfTest.java | 10 +-
.../GridCacheConcurrentTxMultiNodeTest.java | 4 +-
.../GridCacheDaemonNodeAbstractSelfTest.java | 17 +-
.../cache/GridCacheDeploymentSelfTest.java | 8 +-
.../cache/GridCacheEntryMemorySizeSelfTest.java | 6 +-
...hePartitionedProjectionAffinitySelfTest.java | 8 +-
.../cache/GridCachePutAllFailoverSelfTest.java | 4 +-
.../dht/GridCacheDhtMultiBackupTest.java | 4 +-
.../near/GridCacheNearOnlyTopologySelfTest.java | 4 +-
.../near/GridCacheNearTxMultiNodeSelfTest.java | 4 +-
...titionedExplicitLockNodeFailureSelfTest.java | 6 +-
...ridCacheContinuousQueryAbstractSelfTest.java | 10 +-
.../processors/igfs/IgfsStreamsSelfTest.java | 4 +-
.../GridServicePackagePrivateSelfTest.java | 17 +-
.../ignite/loadtests/dsi/GridDsiClient.java | 4 +-
.../tcp/GridCacheDhtLockBackupSelfTest.java | 4 +-
.../testsuites/IgniteComputeGridTestSuite.java | 4 +-
modules/docker/1.5.0.final/Dockerfile | 40 +++
modules/docker/1.5.0.final/run.sh | 50 ++++
modules/docker/Dockerfile | 6 +-
41 files changed, 1343 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
[10/12] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-2224
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-2224
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fdafe285
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fdafe285
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fdafe285
Branch: refs/heads/ignite-2224
Commit: fdafe2858ce784ca7a6fcc2c63b2069431f01755
Parents: e7caaa0 4e61602
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 1 09:39:37 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 1 09:39:37 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 4 +-
.../processors/cache/CacheOperationContext.java | 43 ++-
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheProxyImpl.java | 11 +-
.../processors/cache/IgniteCacheProxy.java | 43 ++-
.../dht/atomic/GridDhtAtomicCache.java | 103 ++++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +-
.../processors/cache/dr/GridCacheDrInfo.java | 49 ++-
.../transactions/IgniteTxLocalAdapter.java | 81 ++--
.../cache/version/GridCacheVersionManager.java | 23 +-
.../GridCacheRebalancingSyncSelfTest.java | 8 +
.../testframework/junits/GridAbstractTest.java | 3 +
modules/kafka/README.txt | 111 +++++-
modules/kafka/pom.xml | 69 ++--
.../ignite/stream/kafka/KafkaStreamer.java | 2 +-
.../kafka/connect/IgniteSinkConnector.java | 91 +++++
.../kafka/connect/IgniteSinkConstants.java | 38 ++
.../stream/kafka/connect/IgniteSinkTask.java | 165 ++++++++
.../kafka/IgniteKafkaStreamerSelfTestSuite.java | 9 +-
.../stream/kafka/KafkaEmbeddedBroker.java | 387 -------------------
.../kafka/KafkaIgniteStreamerSelfTest.java | 13 +-
.../ignite/stream/kafka/SimplePartitioner.java | 53 ---
.../ignite/stream/kafka/TestKafkaBroker.java | 237 ++++++++++++
.../kafka/connect/IgniteSinkConnectorTest.java | 250 ++++++++++++
.../kafka/src/test/resources/example-ignite.xml | 71 ++++
.../commands/top/VisorTopologyCommand.scala | 5 +-
.../apache/ignite/yarn/ApplicationMaster.java | 12 +-
.../apache/ignite/yarn/ClusterProperties.java | 144 ++++---
.../yarn/IgniteApplicationMasterSelfTest.java | 52 +++
parent/pom.xml | 120 +++---
pom.xml | 16 -
31 files changed, 1516 insertions(+), 709 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
[11/12] ignite git commit: Merge remote-tracking branch
'origin/ignite-2224' into ignite-2224
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-2224' into ignite-2224
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ede58b5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ede58b5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ede58b5
Branch: refs/heads/ignite-2224
Commit: 4ede58b5f12aaaeb5e135a530ddf108437c0bd0e
Parents: fdafe28 eccbd0d
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 1 11:51:48 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 1 11:51:48 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/cache/GridCacheAdapter.java | 2 +-
.../processors/cache/GridCacheInterceptorAbstractSelfTest.java | 6 +++---
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4ede58b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
[06/12] ignite git commit: Revert PR with wrong author.
Posted by sb...@apache.org.
Revert PR with wrong author.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/304370cf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/304370cf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/304370cf
Branch: refs/heads/ignite-2224
Commit: 304370cf7500f259e7a3a4699dab9f08a7609200
Parents: e5fda53
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Feb 1 10:43:45 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Feb 1 10:43:45 2016 +0700
----------------------------------------------------------------------
.../apache/ignite/visor/commands/top/VisorTopologyCommand.scala | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/304370cf/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
index d2ec662..5e278ed 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
@@ -263,13 +263,12 @@ class VisorTopologyCommand extends VisorConsoleCommand {
val hostsT = VisorTextTable()
- hostsT #= ("Int./Ext. IPs", "Node ID8(@)","Node Type", "OS", "CPUs", "MACs", "CPU Load")
+ hostsT #= ("Int./Ext. IPs", "Node ID8(@)", "OS", "CPUs", "MACs", "CPU Load")
neighborhood.foreach {
case (_, neighbors) =>
var ips = Set.empty[String]
var id8s = List.empty[String]
- var nodeTypes = List.empty[String]
var macs = Set.empty[String]
var cpuLoadSum = 0.0
@@ -288,7 +287,6 @@ class VisorTopologyCommand extends VisorConsoleCommand {
neighbors.foreach(n => {
id8s = id8s :+ (i.toString + ": " + nodeId8(n.id))
- nodeTypes = nodeTypes :+ (if (n.isClient) "Client" else "Server")
i += 1
ips = ips ++ n.addresses()
@@ -302,7 +300,6 @@ class VisorTopologyCommand extends VisorConsoleCommand {
hostsT += (
ips.toSeq,
id8s,
- nodeTypes,
os,
cpus,
macs.toSeq,
[08/12] ignite git commit: Fixed IGNITE-2419 Ignite on YARN do not
handle memory overhead. This closes #414.
Posted by sb...@apache.org.
Fixed IGNITE-2419 Ignite on YARN do not handle memory overhead. This closes #414.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1945b988
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1945b988
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1945b988
Branch: refs/heads/ignite-2224
Commit: 1945b98849cb52fc297c726fa79a270706135581
Parents: 5969129
Author: Edouard Chevalier <ed...@techmydata.net>
Authored: Mon Feb 1 07:34:59 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Mon Feb 1 07:34:59 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/yarn/ApplicationMaster.java | 12 +-
.../apache/ignite/yarn/ClusterProperties.java | 144 +++++++++++--------
.../yarn/IgniteApplicationMasterSelfTest.java | 52 +++++++
3 files changed, 140 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1945b988/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 755e4e4..b9ab02d 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -137,8 +137,8 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
+ "cp -r ./libs/* ./ignite/*/libs/ || true && "
+ "./ignite/*/bin/ignite.sh "
+ "./ignite-config.xml"
- + " -J-Xmx" + c.getResource().getMemory() + "m"
- + " -J-Xms" + c.getResource().getMemory() + "m"
+ + " -J-Xmx" + ((int)props.memoryPerNode()) + "m"
+ + " -J-Xms" + ((int)props.memoryPerNode()) + "m"
+ IgniteYarnUtils.YARN_LOG_OUT
));
@@ -178,7 +178,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
// Check that slave satisfies min requirements.
if (cont.getResource().getVirtualCores() < props.cpusPerNode()
- || cont.getResource().getMemory() < props.memoryPerNode()) {
+ || cont.getResource().getMemory() < props.totalMemoryPerNode()) {
log.log(Level.FINE, "Container resources not sufficient requirements. Host: {0}, cpu: {1}, mem: {2}",
new Object[]{cont.getNodeId().getHost(), cont.getResource().getVirtualCores(),
cont.getResource().getMemory()});
@@ -291,7 +291,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
// Resource requirements for worker containers.
Resource capability = Records.newRecord(Resource.class);
- capability.setMemory((int)props.memoryPerNode());
+ capability.setMemory((int)props.totalMemoryPerNode());
capability.setVirtualCores((int)props.cpusPerNode());
for (int i = 0; i < props.instances() - runningCnt; ++i) {
@@ -302,7 +302,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
rmClient.addContainerRequest(containerAsk);
log.log(Level.INFO, "Making request. Memory: {0}, cpu {1}.",
- new Object[]{props.memoryPerNode(), props.cpusPerNode()});
+ new Object[]{props.totalMemoryPerNode(), props.cpusPerNode()});
}
}
@@ -329,7 +329,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
private boolean checkAvailableResource() {
Resource availableRes = rmClient.getAvailableResources();
- return availableRes == null || availableRes.getMemory() >= props.memoryPerNode()
+ return availableRes == null || availableRes.getMemory() >= props.totalMemoryPerNode()
&& availableRes.getVirtualCores() >= props.cpusPerNode();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1945b988/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index d040e9f..647aef2 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -48,6 +48,12 @@ public class ClusterProperties {
/** */
public static final double DEFAULT_MEM_PER_NODE = 2048;
+
+ /**
+ * The minimum memory overhead: overhead is by default 0.1* MEMORY_PER_NODE,
+ * with a minimum of DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE.
+ */
+ public static final double DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE = 384;
/** Cluster name. */
private String clusterName = DEFAULT_CLUSTER_NAME;
@@ -63,6 +69,12 @@ public class ClusterProperties {
/** Memory limit. */
private double memPerNode = DEFAULT_MEM_PER_NODE;
+
+ /** */
+ public static final String IGNITE_MEMORY_OVERHEAD_PER_NODE = "IGNITE_MEMORY_OVERHEAD_PER_NODE";
+
+ /** Memory over head to request yarn. */
+ private double memOverHeadPerNode = 0;
/** */
public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
@@ -185,7 +197,30 @@ public class ClusterProperties {
}
/**
- * @return instance count limit.
+ * @return Memory overhead for requested memory.
+ */
+ public double memoryOverHeadPerNode() {
+ return memOverHeadPerNode;
+ }
+
+ /**
+ * Sets memory overhead requested to YARN.
+ *
+ * @param memOverHeadPerNode Memory over head per node.
+ */
+ public void memoryOverHeadPerNode(double memOverHeadPerNode) {
+ this.memOverHeadPerNode = memOverHeadPerNode;
+ }
+
+ /**
+ * @return Provide the total memory requested to ResourceManagers (memoryPerNode + memoryOverheadPerNode).
+ */
+ public double totalMemoryPerNode(){
+ return memoryPerNode() + memoryOverHeadPerNode();
+ }
+
+ /**
+ * @return Instance count limit.
*/
public double instances() {
return nodeCnt;
@@ -278,7 +313,50 @@ public class ClusterProperties {
}
/**
- * @param config path to config file.
+ * Instantiate a ClusterProperties from a set of properties.
+ *
+ * @param props If {@code null} will be used system properties.
+ * @return Cluster properties.
+ */
+ private static ClusterProperties fromProperties(Properties props) {
+ ClusterProperties prop = new ClusterProperties();
+
+ prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
+
+ prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, DEFAULT_CPU_PER_NODE);
+ prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, DEFAULT_MEM_PER_NODE);
+ // The minimum memory overhead: overhead is by default 0.1* MEMORY_PER_NODE,
+ // with a minimum of DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE
+ prop.memOverHeadPerNode = getDoubleProperty(IGNITE_MEMORY_OVERHEAD_PER_NODE, props,
+ Math.max( 0.1 * prop.memPerNode, DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE));
+ prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, DEFAULT_IGNITE_NODE_COUNT);
+
+ prop.igniteUrl = getStringProperty(IGNITE_URL, props, null);
+ prop.ignitePath = getStringProperty(IGNITE_PATH, props, null);
+ prop.licencePath = getStringProperty(LICENCE_PATH, props, null);
+ prop.jvmOpts = getStringProperty(IGNITE_JVM_OPTS, props, null);
+ prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+ prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, props, DEFAULT_IGNITE_LOCAL_WORK_DIR);
+ prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR);
+ prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
+ prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
+
+ String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, props, null);
+
+ if (pattern != null) {
+ try {
+ prop.hostnameConstraint = Pattern.compile(pattern);
+ }
+ catch (PatternSyntaxException e) {
+ log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
+ }
+ }
+
+ return prop;
+ }
+
+ /**
+ * @param config Path to config file.
* @return Cluster configuration.
*/
public static ClusterProperties from(String config) {
@@ -291,36 +369,7 @@ public class ClusterProperties {
props.load(new FileInputStream(config));
}
- ClusterProperties prop = new ClusterProperties();
-
- prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
-
- prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, DEFAULT_CPU_PER_NODE);
- prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, DEFAULT_MEM_PER_NODE);
- prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, DEFAULT_IGNITE_NODE_COUNT);
-
- prop.igniteUrl = getStringProperty(IGNITE_URL, props, null);
- prop.ignitePath = getStringProperty(IGNITE_PATH, props, null);
- prop.licencePath = getStringProperty(LICENCE_PATH, props, null);
- prop.jvmOpts = getStringProperty(IGNITE_JVM_OPTS, props, null);
- prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
- prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, props, DEFAULT_IGNITE_LOCAL_WORK_DIR);
- prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR);
- prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
- prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
-
- String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, props, null);
-
- if (pattern != null) {
- try {
- prop.hostnameConstraint = Pattern.compile(pattern);
- }
- catch (PatternSyntaxException e) {
- log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
- }
- }
-
- return prop;
+ return fromProperties(props);
}
catch (IOException e) {
throw new RuntimeException(e);
@@ -331,36 +380,7 @@ public class ClusterProperties {
* @return Cluster configuration.
*/
public static ClusterProperties from() {
- ClusterProperties prop = new ClusterProperties();
-
- prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME);
-
- prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, DEFAULT_CPU_PER_NODE);
- prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, DEFAULT_MEM_PER_NODE);
- prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, DEFAULT_IGNITE_NODE_COUNT);
-
- prop.igniteUrl = getStringProperty(IGNITE_URL, null, null);
- prop.ignitePath = getStringProperty(IGNITE_PATH, null, null);
- prop.licencePath = getStringProperty(LICENCE_PATH, null, null);
- prop.jvmOpts = getStringProperty(IGNITE_JVM_OPTS, null, null);
- prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR);
- prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, null, DEFAULT_IGNITE_LOCAL_WORK_DIR);
- prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, null, DEFAULT_IGNITE_RELEASES_DIR);
- prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null);
- prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null);
-
- String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, null, null);
-
- if (pattern != null) {
- try {
- prop.hostnameConstraint = Pattern.compile(pattern);
- }
- catch (PatternSyntaxException e) {
- log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
- }
- }
-
- return prop;
+ return fromProperties(null);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/1945b988/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
index 97f6a12..1190313 100644
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
@@ -103,6 +103,58 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
assertEquals(1024, req.getCapability().getMemory());
}
}
+
+ /**
+ * Tests whether memory overhead is allocated within container memory.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMemoryOverHeadAllocation() throws Exception {
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(new NMMock());
+
+ props.cpusPerNode(2);
+ props.memoryPerNode(1024);
+ props.memoryOverHeadPerNode(512);
+ props.instances(3);
+
+ Thread thread = runAppMaster(appMaster);
+
+ List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
+
+ interruptedThread(thread);
+
+ assertEquals(3, contRequests.size());
+
+ for (AMRMClient.ContainerRequest req : contRequests) {
+ assertEquals(2, req.getCapability().getVirtualCores());
+ assertEquals(1024 + 512, req.getCapability().getMemory());
+ }
+ }
+
+ /**
+ * Tests whether memory overhead prevents from allocating container.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMemoryOverHeadPreventAllocation() throws Exception {
+ rmMock.availableRes(new MockResource(1024, 2));
+ appMaster.setRmClient(rmMock);
+ appMaster.setNmClient(new NMMock());
+
+ props.cpusPerNode(2);
+ props.memoryPerNode(1024);
+ props.memoryOverHeadPerNode(512);
+ props.instances(3);
+
+ Thread thread = runAppMaster(appMaster);
+
+ List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
+
+ interruptedThread(thread);
+
+ assertEquals(0, contRequests.size());
+ }
/**
* @throws Exception If failed.
[05/12] ignite git commit: IGNITE-1069 Added output of node type
(server or client) in Visor commandline top command.
Posted by sb...@apache.org.
IGNITE-1069 Added output of node type (server or client) in Visor commandline top command.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e5fda53a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e5fda53a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e5fda53a
Branch: refs/heads/ignite-2224
Commit: e5fda53ae5f0e64e63f97db9d664081d24b34fa7
Parents: c92c274
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Feb 1 10:08:21 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Feb 1 10:08:21 2016 +0700
----------------------------------------------------------------------
.../apache/ignite/visor/commands/top/VisorTopologyCommand.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fda53a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
index 5e278ed..d2ec662 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
@@ -263,12 +263,13 @@ class VisorTopologyCommand extends VisorConsoleCommand {
val hostsT = VisorTextTable()
- hostsT #= ("Int./Ext. IPs", "Node ID8(@)", "OS", "CPUs", "MACs", "CPU Load")
+ hostsT #= ("Int./Ext. IPs", "Node ID8(@)","Node Type", "OS", "CPUs", "MACs", "CPU Load")
neighborhood.foreach {
case (_, neighbors) =>
var ips = Set.empty[String]
var id8s = List.empty[String]
+ var nodeTypes = List.empty[String]
var macs = Set.empty[String]
var cpuLoadSum = 0.0
@@ -287,6 +288,7 @@ class VisorTopologyCommand extends VisorConsoleCommand {
neighbors.foreach(n => {
id8s = id8s :+ (i.toString + ": " + nodeId8(n.id))
+ nodeTypes = nodeTypes :+ (if (n.isClient) "Client" else "Server")
i += 1
ips = ips ++ n.addresses()
@@ -300,6 +302,7 @@ class VisorTopologyCommand extends VisorConsoleCommand {
hostsT += (
ips.toSeq,
id8s,
+ nodeTypes,
os,
cpus,
macs.toSeq,
[09/12] ignite git commit: Fixed conflict resolver API.
Posted by sb...@apache.org.
Fixed conflict resolver API.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e61602e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e61602e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e61602e
Branch: refs/heads/ignite-2224
Commit: 4e61602eca679bf3689bb23f2bc1c9e58b4eb8dc
Parents: 1945b98
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Jan 25 19:16:47 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Mon Feb 1 07:43:59 2016 +0300
----------------------------------------------------------------------
.../processors/cache/CacheOperationContext.java | 43 +++++--
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheProxyImpl.java | 11 +-
.../processors/cache/IgniteCacheProxy.java | 43 ++++++-
.../dht/atomic/GridDhtAtomicCache.java | 104 +++++++++++++----
.../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +-
.../processors/cache/dr/GridCacheDrInfo.java | 49 +++++++-
.../transactions/IgniteTxLocalAdapter.java | 81 +++++++++-----
.../cache/version/GridCacheVersionManager.java | 23 ++--
.../testframework/junits/GridAbstractTest.java | 3 +
parent/pom.xml | 111 +++++++++++--------
pom.xml | 16 ---
12 files changed, 351 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index 21934d0..f39a09d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -48,6 +48,9 @@ public class CacheOperationContext implements Serializable {
/** Expiry policy. */
private final ExpiryPolicy expiryPlc;
+ /** Data center Id. */
+ private final Byte dataCenterId;
+
/**
* Constructor with default values.
*/
@@ -61,6 +64,8 @@ public class CacheOperationContext implements Serializable {
expiryPlc = null;
noRetries = false;
+
+ dataCenterId = null;
}
/**
@@ -68,13 +73,15 @@ public class CacheOperationContext implements Serializable {
* @param subjId Subject ID.
* @param keepBinary Keep binary flag.
* @param expiryPlc Expiry policy.
+ * @param dataCenterId Data center id.
*/
public CacheOperationContext(
boolean skipStore,
@Nullable UUID subjId,
boolean keepBinary,
@Nullable ExpiryPolicy expiryPlc,
- boolean noRetries) {
+ boolean noRetries,
+ @Nullable Byte dataCenterId) {
this.skipStore = skipStore;
this.subjId = subjId;
@@ -84,6 +91,8 @@ public class CacheOperationContext implements Serializable {
this.expiryPlc = expiryPlc;
this.noRetries = noRetries;
+
+ this.dataCenterId = dataCenterId;
}
/**
@@ -94,6 +103,13 @@ public class CacheOperationContext implements Serializable {
}
/**
+ * @return {@code True} if data center id is set otherwise {@code false}.
+ */
+ public boolean hasDataCenterId() {
+ return dataCenterId != null;
+ }
+
+ /**
* See {@link IgniteInternalCache#keepBinary()}.
*
* @return New instance of CacheOperationContext with keep binary flag.
@@ -104,7 +120,8 @@ public class CacheOperationContext implements Serializable {
subjId,
true,
expiryPlc,
- noRetries);
+ noRetries,
+ dataCenterId);
}
/**
@@ -117,6 +134,15 @@ public class CacheOperationContext implements Serializable {
}
/**
+ * Gets data center ID.
+ *
+ * @return Client ID.
+ */
+ @Nullable public Byte dataCenterId() {
+ return dataCenterId;
+ }
+
+ /**
* See {@link IgniteInternalCache#forSubjectId(UUID)}.
*
* @param subjId Subject id.
@@ -128,7 +154,8 @@ public class CacheOperationContext implements Serializable {
subjId,
keepBinary,
expiryPlc,
- noRetries);
+ noRetries,
+ dataCenterId);
}
/**
@@ -150,7 +177,8 @@ public class CacheOperationContext implements Serializable {
subjId,
keepBinary,
expiryPlc,
- noRetries);
+ noRetries,
+ dataCenterId);
}
/**
@@ -172,7 +200,8 @@ public class CacheOperationContext implements Serializable {
subjId,
true,
plc,
- noRetries);
+ noRetries,
+ dataCenterId);
}
/**
@@ -185,8 +214,8 @@ public class CacheOperationContext implements Serializable {
subjId,
keepBinary,
expiryPlc,
- noRetries
- );
+ noRetries,
+ dataCenterId);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3081cfb..9fd65e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -447,7 +447,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
- CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false);
+ CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
@@ -459,14 +459,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) {
- CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false);
+ CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
/** {@inheritDoc} */
@Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() {
- CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false);
+ CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false, null);
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx);
}
@@ -483,7 +483,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
assert !CU.isAtomicsCache(ctx.name());
assert !CU.isMarshallerCache(ctx.name());
- CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false);
+ CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 8ffd273..3a53942 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -209,7 +209,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null, false));
+ opCtx != null ? opCtx.forSubjectId(subjId) :
+ new CacheOperationContext(false, subjId, false, null, false, null));
}
/** {@inheritDoc} */
@@ -221,7 +222,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
return this;
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null, false));
+ opCtx != null ? opCtx.setSkipStore(skipStore) :
+ new CacheOperationContext(true, null, false, null, false, null));
}
finally {
gate.leave(prev);
@@ -236,7 +238,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx,
(GridCacheAdapter<K1, V1>)delegate,
- opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false));
+ opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false, null));
}
/** {@inheritDoc} */
@@ -1608,7 +1610,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
try {
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc, false));
+ opCtx != null ? opCtx.withExpiryPolicy(plc) :
+ new CacheOperationContext(false, null, false, plc, false, null));
}
finally {
gate.leave(prev);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b64c69c..9e66d4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -307,7 +307,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
try {
CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) :
- new CacheOperationContext(false, null, false, plc, false);
+ new CacheOperationContext(false, null, false, plc, false, null);
return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock);
}
@@ -339,7 +339,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
return this;
CacheOperationContext opCtx0 = opCtx != null ? opCtx.setNoRetries(true) :
- new CacheOperationContext(false, null, false, null, true);
+ new CacheOperationContext(false, null, false, null, true, null);
return new IgniteCacheProxy<>(ctx,
delegate,
@@ -1788,7 +1788,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
opCtx != null ? opCtx.subjectId() : null,
true,
opCtx != null ? opCtx.expiry() : null,
- opCtx != null && opCtx.noRetries());
+ opCtx != null && opCtx.noRetries(),
+ opCtx != null ? opCtx.dataCenterId() : null);
return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
(GridCacheAdapter<K1, V1>)delegate,
@@ -1802,6 +1803,39 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/**
+ * @return Projection for data center id.
+ */
+ @SuppressWarnings("unchecked")
+ public IgniteCache<K, V> withDataCenterId(byte dataCenterId) {
+ CacheOperationContext prev = onEnter(gate, opCtx);
+
+ try {
+ Byte prevDataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
+ if (prevDataCenterId != null && dataCenterId == prevDataCenterId)
+ return this;
+
+ CacheOperationContext opCtx0 =
+ new CacheOperationContext(
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null ? opCtx.subjectId() : null,
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null ? opCtx.expiry() : null,
+ opCtx != null && opCtx.noRetries(),
+ dataCenterId);
+
+ return new IgniteCacheProxy<>(ctx,
+ delegate,
+ opCtx0,
+ isAsync(),
+ lock);
+ }
+ finally {
+ onLeave(gate, prev);
+ }
+ }
+
+ /**
* @return Cache with skip store enabled.
*/
public IgniteCache<K, V> skipStore() {
@@ -1820,7 +1854,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
opCtx != null ? opCtx.subjectId() : null,
opCtx != null && opCtx.isKeepBinary(),
opCtx != null ? opCtx.expiry() : null,
- opCtx != null && opCtx.noRetries());
+ opCtx != null && opCtx.noRetries(),
+ opCtx != null ? opCtx.dataCenterId() : null);
return new IgniteCacheProxy<>(ctx,
delegate,
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index aa79cfa..6b23550 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -99,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -448,7 +448,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
false,
filter,
- true);
+ true,
+ UPDATE);
}
/** {@inheritDoc} */
@@ -464,7 +465,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
filter,
- true);
+ true,
+ UPDATE);
}
/** {@inheritDoc} */
@@ -479,7 +481,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
false,
ctx.noValArray(),
- false).get();
+ false,
+ UPDATE).get();
}
/** {@inheritDoc} */
@@ -571,7 +574,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
true,
true,
ctx.equalsValArray(oldVal),
- true);
+ true,
+ UPDATE);
}
/** {@inheritDoc} */
@@ -589,7 +593,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
CU.empty0(),
- true).chain(RET2NULL);
+ true,
+ UPDATE).chain(RET2NULL);
}
/** {@inheritDoc} */
@@ -610,7 +615,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- true);
+ true,
+ UPDATE);
}
/** {@inheritDoc} */
@@ -790,7 +796,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- true);
+ true,
+ TRANSFORM);
return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
@Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut)
@@ -846,7 +853,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- true);
+ true,
+ TRANSFORM);
return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() {
@Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException {
@@ -882,7 +890,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
false,
false,
null,
- true);
+ true,
+ TRANSFORM);
}
/**
@@ -901,15 +910,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("ConstantConditions")
private IgniteInternalFuture updateAllAsync0(
- @Nullable final Map<? extends K, ? extends V> map,
- @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
+ @Nullable Map<? extends K, ? extends V> map,
+ @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
@Nullable Object[] invokeArgs,
- @Nullable final Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
- @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
final boolean retval,
final boolean rawRetval,
@Nullable final CacheEntryPredicate[] filter,
- final boolean waitTopFut
+ final boolean waitTopFut,
+ final GridCacheOperation op
) {
assert ctx.updatesAllowed();
@@ -918,7 +928,47 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.checkSecurity(SecurityPermission.CACHE_PUT);
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ final CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert conflictPutMap == null : conflictPutMap;
+ assert conflictRmvMap == null : conflictRmvMap;
+
+ if (op == GridCacheOperation.TRANSFORM) {
+ assert invokeMap != null : invokeMap;
+
+ conflictPutMap = F.viewReadOnly((Map)invokeMap,
+ new IgniteClosure<EntryProcessor, GridCacheDrInfo>() {
+ @Override public GridCacheDrInfo apply(EntryProcessor o) {
+ return new GridCacheDrInfo(o, ctx.versions().next(opCtx.dataCenterId()));
+ }
+ });
+
+ invokeMap = null;
+ }
+ else if (op == GridCacheOperation.DELETE) {
+ assert map != null : map;
+
+ conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() {
+ @Override public GridCacheVersion apply(V o) {
+ return ctx.versions().next(opCtx.dataCenterId());
+ }
+ });
+
+ map = null;
+ }
+ else {
+ assert map != null : map;
+
+ conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() {
+ @Override public GridCacheDrInfo apply(V o) {
+ return new GridCacheDrInfo(ctx.toCacheObject(o), ctx.versions().next(opCtx.dataCenterId()));
+ }
+ });
+
+ map = null;
+ }
+ }
UUID subjId = ctx.subjectIdPerCall(null, opCtx);
@@ -928,7 +978,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx,
this,
ctx.config().getWriteSynchronizationMode(),
- invokeMap != null ? TRANSFORM : UPDATE,
+ op,
map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
conflictPutMap.keySet() : conflictRmvMap.keySet(),
map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
@@ -966,8 +1016,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @return Completion future.
*/
private IgniteInternalFuture removeAllAsync0(
- @Nullable final Collection<? extends K> keys,
- @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictMap,
+ @Nullable Collection<? extends K> keys,
+ @Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap,
final boolean retval,
boolean rawRetval,
@Nullable final CacheEntryPredicate[] filter
@@ -985,12 +1035,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ final CacheOperationContext opCtx = ctx.operationContextPerCall();
UUID subjId = ctx.subjectIdPerCall(null, opCtx);
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+ Collection<GridCacheVersion> drVers = null;
+
+ if (opCtx != null && keys != null && opCtx.hasDataCenterId()) {
+ assert conflictMap == null : conflictMap;
+
+ drVers = F.transform(keys, new C1<K, GridCacheVersion>() {
+ @Override public GridCacheVersion apply(K k) {
+ return ctx.versions().next(opCtx.dataCenterId());
+ }
+ });
+ }
+
final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
ctx,
this,
@@ -1000,7 +1062,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
null,
null,
- keys != null ? null : conflictMap.values(),
+ drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
retval,
rawRetval,
(filter != null && opCtx != null) ? opCtx.expiry() : null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 3c86083..c9e1a11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -1034,7 +1034,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
else if (conflictPutVals != null) {
GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
- val = conflictPutVal.value();
+ val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
@@ -1142,7 +1142,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// Conflict PUT.
GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
- val = conflictPutVal.value();
+ val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index 8635fe2..02bc6b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import javax.cache.processor.EntryProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -36,6 +37,9 @@ public class GridCacheDrInfo implements Externalizable {
/** Value. */
private CacheObject val;
+ /** Entry processor. */
+ private EntryProcessor proc;
+
/** DR version. */
private GridCacheVersion ver;
@@ -61,6 +65,29 @@ public class GridCacheDrInfo implements Externalizable {
}
/**
+ * Constructor.
+ *
+ * @param ver Version.
+ */
+ public GridCacheDrInfo(GridCacheVersion ver) {
+ this.ver = ver;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param proc Entry processor.
+ * @param ver Version.
+ */
+ public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
+ assert proc != null;
+ assert ver != null;
+
+ this.proc = proc;
+ this.ver = ver;
+ }
+
+ /**
* @return Value.
*/
public CacheObject value() {
@@ -68,6 +95,20 @@ public class GridCacheDrInfo implements Externalizable {
}
/**
+ * @return Entry processor.
+ */
+ public EntryProcessor entryProcessor() {
+ return proc;
+ }
+
+ /**
+ * @return Value (entry processor or cache object.
+ */
+ public Object valueEx() {
+ return val == null ? proc : val;
+ }
+
+ /**
* @return Version.
*/
public GridCacheVersion version() {
@@ -88,13 +129,13 @@ public class GridCacheDrInfo implements Externalizable {
return CU.EXPIRE_TIME_ETERNAL;
}
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
assert false;
}
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
assert false;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 926eaf2..aad9841 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1977,8 +1977,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
GridCacheContext cacheCtx,
Map<KeyCacheObject, GridCacheDrInfo> drMap
) {
+ Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+ @Override public Object apply(GridCacheDrInfo val) {
+ return val.value();
+ }
+ });
+
return this.<Object, Object>putAllAsync0(cacheCtx,
- null,
+ map,
null,
null,
drMap,
@@ -2055,7 +2061,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final GridCacheReturn ret,
boolean skipStore,
final boolean singleRmv,
- boolean keepBinary) {
+ boolean keepBinary,
+ Byte dataCenterId) {
try {
addActiveCache(cacheCtx);
@@ -2066,6 +2073,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (entryProcessor != null)
transform = true;
+ GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
+
boolean loadMissed = enlistWriteEntry(cacheCtx,
cacheKey,
val,
@@ -2075,7 +2084,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
retval,
lockOnly,
filter,
- /*drVer*/null,
+ /*drVer*/drVer,
/*drTtl*/-1L,
/*drExpireTime*/-1L,
ret,
@@ -2125,6 +2134,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
* @param drRmvMap DR remove map (optional).
* @param skipStore Skip store flag.
* @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+ * @param keepBinary Keep binary flag.
+ * @param dataCenterId Optional data center ID.
* @return Future for missing values loading.
*/
private <K, V> IgniteInternalFuture<Void> enlistWrite(
@@ -2143,7 +2154,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
boolean skipStore,
final boolean singleRmv,
- final boolean keepBinary
+ final boolean keepBinary,
+ Byte dataCenterId
) {
assert retval || invokeMap == null;
@@ -2197,6 +2209,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
drTtl = -1L;
drExpireTime = -1L;
}
+ else if (dataCenterId != null) {
+ drVer = cctx.versions().next(dataCenterId);
+ drTtl = -1L;
+ drExpireTime = -1L;
+ }
else {
drVer = null;
drTtl = -1L;
@@ -2938,6 +2955,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+ final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
@@ -2955,7 +2974,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
ret,
opCtx != null && opCtx.skipStore(),
/*singleRmv*/false,
- keepBinary);
+ keepBinary,
+ dataCenterId);
if (pessimistic()) {
assert loadFut == null || loadFut.isDone() : loadFut;
@@ -3053,7 +3073,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@Nullable Map<? extends K, ? extends V> map,
@Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
@Nullable final Object[] invokeArgs,
- @Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap,
+ @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
final boolean retval,
@Nullable final CacheEntryPredicate[] filter
) {
@@ -3066,25 +3086,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
return new GridFinishedFuture(e);
}
- // Cached entry may be passed only from entry wrapper.
- final Map<?, ?> map0;
- final Map<?, EntryProcessor<K, V, Object>> invokeMap0;
+ final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
- if (drMap != null) {
- assert map == null;
+ final Byte dataCenterId;
- map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
- @Override public Object apply(GridCacheDrInfo val) {
- return val.value();
- }
- });
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+ assert map != null || invokeMap != null;
- invokeMap0 = null;
- }
- else {
- map0 = map;
- invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+ dataCenterId = opCtx.dataCenterId();
}
+ else
+ dataCenterId = null;
+
+ // Cached entry may be passed only from entry wrapper.
+ final Map<?, ?> map0 = map;
+ final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
if (log.isDebugEnabled())
log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
@@ -3110,8 +3127,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
- CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
final IgniteInternalFuture<Void> loadFut = enlistWrite(
@@ -3130,7 +3145,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
null,
opCtx != null && opCtx.skipStore(),
false,
- keepBinary);
+ keepBinary,
+ dataCenterId);
if (pessimistic()) {
assert loadFut == null || loadFut.isDone() : loadFut;
@@ -3334,6 +3350,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
else
keys0 = keys;
+ CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+ final Byte dataCenterId;
+
+ if (opCtx != null && opCtx.hasDataCenterId()) {
+ assert drMap == null : drMap;
+
+ dataCenterId = opCtx.dataCenterId();
+ }
+ else
+ dataCenterId = null;
+
assert keys0 != null;
if (log.isDebugEnabled()) {
@@ -3367,8 +3395,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final Collection<KeyCacheObject> enlisted = new ArrayList<>();
- CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
ExpiryPolicy plc;
if (!F.isEmpty(filter))
@@ -3394,7 +3420,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
drMap,
opCtx != null && opCtx.skipStore(),
singleRmv,
- keepBinary
+ keepBinary,
+ dataCenterId
);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 68d03cd..166c713 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -176,7 +176,15 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on current topology.
*/
public GridCacheVersion next() {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, false);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
+ }
+
+ /**
+ * @param dataCenterId Data center id.
+ * @return Next version based on current topology with given data center id.
+ */
+ public GridCacheVersion next(byte dataCenterId) {
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
}
/**
@@ -188,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on given topology version.
*/
public GridCacheVersion next(AffinityTopologyVersion topVer) {
- return next(topVer.topologyVersion(), true, false);
+ return next(topVer.topologyVersion(), true, false, dataCenterId);
}
/**
@@ -197,7 +205,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad() {
- return next(cctx.kernalContext().discovery().topologyVersion(), true, true);
+ return next(cctx.kernalContext().discovery().topologyVersion(), true, true, dataCenterId);
}
/**
@@ -206,7 +214,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) {
- return next(topVer.topologyVersion(), true, true);
+ return next(topVer.topologyVersion(), true, true, dataCenterId);
}
/**
@@ -215,7 +223,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version for cache store operations.
*/
public GridCacheVersion nextForLoad(GridCacheVersion ver) {
- return next(ver.topologyVersion(), false, true);
+ return next(ver.topologyVersion(), false, true, dataCenterId);
}
/**
@@ -225,7 +233,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @return Next version based on given cache version.
*/
public GridCacheVersion next(GridCacheVersion ver) {
- return next(ver.topologyVersion(), false, false);
+ return next(ver.topologyVersion(), false, false, dataCenterId);
}
/**
@@ -237,9 +245,10 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
* @param topVer Topology version for which new version should be obtained.
* @param addTime If {@code true} then adds to the given topology version number of seconds
* from the start time of the first grid node.
+ * @param dataCenterId Data center id.
* @return New lock order.
*/
- private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad) {
+ private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, byte dataCenterId) {
if (topVer == -1)
topVer = cctx.kernalContext().discovery().topologyVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 99d1a42..8bf877a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1026,6 +1026,9 @@ public abstract class GridAbstractTest extends TestCase {
protected IgniteConfiguration loadConfiguration(String springCfgPath) throws IgniteCheckedException {
URL cfgLocation = U.resolveIgniteUrl(springCfgPath);
+ if (cfgLocation == null)
+ cfgLocation = U.resolveIgniteUrl(springCfgPath, false);
+
assert cfgLocation != null;
ApplicationContext springCtx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 21d8c69..c0f49c8 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -214,13 +214,6 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-apache-license-gen</artifactId>
- <version>${project.version}</version>
- <scope>test</scope><!-- hack to have ignite-apache-license-gen at first place at mvn reactor -->
- </dependency>
</dependencies>
<build>
@@ -715,48 +708,6 @@
</execution>
</executions>
</plugin>
-
- <plugin><!-- generates dependencies licenses -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-remote-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>ignite-dependencies</id>
- <goals>
- <goal>process</goal>
- </goals>
- <configuration>
- <resourceBundles>
- <resourceBundle>org.apache.ignite:ignite-apache-license-gen:${project.version}</resourceBundle>
- </resourceBundles>
- <excludeTransitive>true</excludeTransitive>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <id>licenses-file-rename</id>
- <goals>
- <goal>run</goal>
- </goals>
- <phase>compile</phase>
- <configuration>
- <target>
- <!-- moving licenses generated by "ignite-dependencies" -->
- <move file="${basedir}/target/classes/META-INF/licenses.txt" tofile="${basedir}/target/licenses/${project.artifactId}-licenses.txt"/>
- </target>
- <failOnError>false</failOnError>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
</plugins>
</build>
@@ -998,5 +949,67 @@
</dependency>
</dependencies>
</profile>
+
+ <profile>
+ <id>release</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-apache-license-gen</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope><!-- hack to have ignite-apache-license-gen at first place at mvn reactor -->
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin><!-- generates dependencies licenses -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>ignite-dependencies</id>
+ <goals>
+ <goal>process</goal>
+ </goals>
+ <configuration>
+ <resourceBundles>
+ <resourceBundle>org.apache.ignite:ignite-apache-license-gen:${project.version}
+ </resourceBundle>
+ </resourceBundles>
+ <excludeTransitive>true</excludeTransitive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>licenses-file-rename</id>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <phase>compile</phase>
+ <configuration>
+ <target>
+ <!-- moving licenses generated by "ignite-dependencies" -->
+ <move file="${basedir}/target/classes/META-INF/licenses.txt" tofile="${basedir}/target/licenses/${project.artifactId}-licenses.txt"/>
+ </target>
+ <failOnError>false</failOnError>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4f797e8..bead3ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -918,22 +918,6 @@
</execution>
</executions>
</plugin>
-
- <plugin><!-- skipping generates dependencies licenses -->
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-remote-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>ignite-dependencies</id>
- <goals>
- <goal>process</goal>
- </goals>
- <configuration>
- <skip>true</skip>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
</project>
[07/12] ignite git commit: IGNITE-1069 Added output of node type
(server or client) in Visor commandline top command - Fixes #440.
Posted by sb...@apache.org.
IGNITE-1069 Added output of node type (server or client) in Visor commandline top command - Fixes #440.
Signed-off-by: Alexey Kuznetsov <ak...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59691291
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59691291
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59691291
Branch: refs/heads/ignite-2224
Commit: 59691291b55265e561ee5b707da8d95d2eee79db
Parents: 304370c
Author: kcheng <kc...@gmail.com>
Authored: Mon Feb 1 10:48:49 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Feb 1 10:48:49 2016 +0700
----------------------------------------------------------------------
.../apache/ignite/visor/commands/top/VisorTopologyCommand.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/59691291/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
index 5e278ed..d2ec662 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
@@ -263,12 +263,13 @@ class VisorTopologyCommand extends VisorConsoleCommand {
val hostsT = VisorTextTable()
- hostsT #= ("Int./Ext. IPs", "Node ID8(@)", "OS", "CPUs", "MACs", "CPU Load")
+ hostsT #= ("Int./Ext. IPs", "Node ID8(@)","Node Type", "OS", "CPUs", "MACs", "CPU Load")
neighborhood.foreach {
case (_, neighbors) =>
var ips = Set.empty[String]
var id8s = List.empty[String]
+ var nodeTypes = List.empty[String]
var macs = Set.empty[String]
var cpuLoadSum = 0.0
@@ -287,6 +288,7 @@ class VisorTopologyCommand extends VisorConsoleCommand {
neighbors.foreach(n => {
id8s = id8s :+ (i.toString + ": " + nodeId8(n.id))
+ nodeTypes = nodeTypes :+ (if (n.isClient) "Client" else "Server")
i += 1
ips = ips ++ n.addresses()
@@ -300,6 +302,7 @@ class VisorTopologyCommand extends VisorConsoleCommand {
hostsT += (
ips.toSeq,
id8s,
+ nodeTypes,
os,
cpus,
macs.toSeq,
[04/12] ignite git commit: IGNITE-2016: Kafka Connect integration -
reflected review comments (avoiding setting same task parameters more than
once). - Fixes #335.
Posted by sb...@apache.org.
IGNITE-2016: Kafka Connect integration - reflected review comments (avoiding setting same task parameters more than once). - Fixes #335.
Signed-off-by: shtykh_roman <rs...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c92c2747
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c92c2747
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c92c2747
Branch: refs/heads/ignite-2224
Commit: c92c274737391546b3dfe3ccbe527329c462d95f
Parents: 861236a
Author: shtykh_roman <rs...@yahoo.com>
Authored: Mon Feb 1 10:35:02 2016 +0900
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Mon Feb 1 10:35:02 2016 +0900
----------------------------------------------------------------------
modules/kafka/README.txt | 111 +++++-
modules/kafka/pom.xml | 69 ++--
.../ignite/stream/kafka/KafkaStreamer.java | 2 +-
.../kafka/connect/IgniteSinkConnector.java | 91 +++++
.../kafka/connect/IgniteSinkConstants.java | 38 ++
.../stream/kafka/connect/IgniteSinkTask.java | 165 ++++++++
.../kafka/IgniteKafkaStreamerSelfTestSuite.java | 9 +-
.../stream/kafka/KafkaEmbeddedBroker.java | 387 -------------------
.../kafka/KafkaIgniteStreamerSelfTest.java | 13 +-
.../ignite/stream/kafka/SimplePartitioner.java | 53 ---
.../ignite/stream/kafka/TestKafkaBroker.java | 237 ++++++++++++
.../kafka/connect/IgniteSinkConnectorTest.java | 250 ++++++++++++
.../kafka/src/test/resources/example-ignite.xml | 71 ++++
parent/pom.xml | 9 +-
14 files changed, 1011 insertions(+), 494 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/README.txt
----------------------------------------------------------------------
diff --git a/modules/kafka/README.txt b/modules/kafka/README.txt
index 1eaf861..f4e56bd 100644
--- a/modules/kafka/README.txt
+++ b/modules/kafka/README.txt
@@ -1,16 +1,17 @@
Apache Ignite Kafka Streamer Module
-------------------------
+-----------------------------------
Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
-To enable Kafka Streamer module when starting a standalone node, move 'optional/ignite-Kafka' folder to
-'libs' folder before running 'ignite.{sh|bat}' script. The content of the module folder will
-be added to classpath in this case.
+There are two ways this can be achieved:
+- importing Kafka Streamer module in your Maven project and instantiate KafkaStreamer for data streaming;
+- using Kafka Connect functionality.
-Importing Ignite Kafka Streamer Module In Maven Project
--------------------------------------
+Below are the details.
-If you are using Maven to manage dependencies of your project, you can add JCL module
+## Importing Ignite Kafka Streamer Module In Maven Project
+
+If you are using Maven to manage dependencies of your project, you can add Kafka module
dependency like this (replace '${ignite.version}' with actual Ignite version you are
interested in):
@@ -30,3 +31,99 @@ interested in):
</dependencies>
...
</project>
+
+
+## Streaming Data via Kafka Connect
+
+Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
+For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- ignite-spring-1.5.0-SNAPSHOT.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=string-ignite-connector
+connector.class=IgniteSinkConnector
+tasks.max=2
+topics=testTopic1,testTopic2
+
+# cache
+cacheName=cache1
+cacheAllowOverwrite=true
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. 'cacheAllowOverwrite' is set to true if you want to enable overwriting existing values in cache.
+You can also set 'cachePerNodeDataSize' and 'cachePerNodeParOps' to adjust per-node buffer and the maximum number
+of parallel stream operations for a single node.
+
+See example-ignite.xml in tests for a simple cache configuration file example.
+
+4. Start connector, for instance, as follows,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+## Checking the Flow
+
+To perform a very basic functionality check, you can do the following,
+
+1. Start Zookeeper
+```
+bin/zookeeper-server-start.sh config/zookeeper.properties
+```
+
+2. Start Kafka server
+```
+bin/kafka-server-start.sh config/server.properties
+```
+
+3. Provide some data input to the Kafka server
+```
+bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --operty key.separator=,
+k1,v1
+```
+
+4. Start the connector. For example,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+5. Check the value is in the cache. For example, via REST,
+```
+http://node1:8080/ignite?cmd=size&cacheName=cache1
+```
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index e00b190..0ac0487 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -20,7 +20,8 @@
<!--
POM file.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -43,48 +44,28 @@
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
+ <artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.jopt-simple</groupId>
- <artifactId>jopt-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ <version>${kafka.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-log4j</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-runtime</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm-all</artifactId>
- <version>${asm.version}</version>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
@@ -96,11 +77,33 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>${easymock.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>${kafka.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index cbc5b1b..487c369 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -224,4 +224,4 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
new file mode 100644
index 0000000..9385920
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Sink connector to manage sink tasks that transfer Kafka data to Ignite grid.
+ */
+public class IgniteSinkConnector extends SinkConnector {
+ /** Sink properties. */
+ private Map<String, String> configProps;
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return AppInfoParser.getVersion();
+ }
+
+ /**
+ * A sink lifecycle method. Validates grid-specific sink properties.
+ *
+ * @param props Sink properties.
+ */
+ @Override public void start(Map<String, String> props) {
+ configProps = props;
+
+ try {
+ A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
+ A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
+ A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
+ }
+ catch (IllegalArgumentException e) {
+ throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
+ }
+ }
+
+ /**
+ * Obtains a sink task class to be instantiated for feeding data into grid.
+ *
+ * @return IgniteSinkTask class.
+ */
+ @Override public Class<? extends Task> taskClass() {
+ return IgniteSinkTask.class;
+ }
+
+ /**
+ * Builds each config for <tt>maxTasks</tt> tasks.
+ *
+ * @param maxTasks Max number of tasks.
+ * @return Task configs.
+ */
+ @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+ List<Map<String, String>> taskConfigs = new ArrayList<>();
+ Map<String, String> taskProps = new HashMap<>();
+
+ taskProps.putAll(configProps);
+
+ for (int i = 0; i < maxTasks; i++)
+ taskConfigs.add(taskProps);
+
+ return taskConfigs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
new file mode 100644
index 0000000..7680d96
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSinkConstants {
+ /** Ignite configuration file path. */
+ public static final String CACHE_CFG_PATH = "igniteCfg";
+
+ /** Cache name. */
+ public static final String CACHE_NAME = "cacheName";
+
+ /** Flag to enable overwriting existing values in cache. */
+ public static final String CACHE_ALLOW_OVERWRITE = "cacheAllowOverwrite";
+
+ /** Size of per-node buffer before data is sent to remote node. */
+ public static final String CACHE_PER_NODE_DATA_SIZE = "cachePerNodeDataSize";
+
+ /** Maximum number of parallel stream operations per node. */
+ public static final String CACHE_PER_NODE_PAR_OPS = "cachePerNodeParOps";
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
new file mode 100644
index 0000000..3d9a00d
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume sequences of SinkRecords and write data to grid.
+ */
+public class IgniteSinkTask extends SinkTask {
+ /** Logger. */
+ private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
+
+ /** Flag for stopped state. */
+ private static volatile boolean stopped = true;
+
+ /** Ignite grid configuration file. */
+ private static String igniteConfigFile;
+
+ /** Cache name. */
+ private static String cacheName;
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return new IgniteSinkConnector().version();
+ }
+
+ /**
+ * Initializes grid client from configPath.
+ *
+ * @param props Task properties.
+ */
+ @Override public void start(Map<String, String> props) {
+ // Each task has the same parameters -- avoid setting more than once.
+ if (cacheName != null)
+ return;
+
+ cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
+ igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
+ StreamerContext.getStreamer().allowOverwrite(
+ Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
+ StreamerContext.getStreamer().perNodeBufferSize(
+ Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
+
+ if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
+ StreamerContext.getStreamer().perNodeParallelOperations(
+ Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+
+ stopped = false;
+ }
+
+ /**
+ * Buffers records.
+ *
+ * @param records Records to inject into grid.
+ */
+ @SuppressWarnings("unchecked")
+ @Override public void put(Collection<SinkRecord> records) {
+ try {
+ for (SinkRecord record : records) {
+ if (record.key() != null) {
+ // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
+ StreamerContext.getStreamer().addData(record.key(), record.value());
+ }
+ else {
+ log.error("Failed to stream a record with null key!");
+ }
+
+ }
+ }
+ catch (ConnectException e) {
+ log.error("Failed adding record", e);
+
+ throw new ConnectException(e);
+ }
+ }
+
+ /**
+ * Pushes buffered data to grid. Flush interval is configured by worker configurations.
+ *
+ * @param offsets Offset information.
+ */
+ @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ if (stopped)
+ return;
+
+ StreamerContext.getStreamer().flush();
+ }
+
+ /**
+ * Stops the grid client.
+ */
+ @Override public void stop() {
+ if (stopped)
+ return;
+
+ stopped = true;
+
+ StreamerContext.getStreamer().close();
+ StreamerContext.getIgnite().close();
+ }
+
+ /**
+ * Streamer context initializing grid and data streamer instances on demand.
+ */
+ public static class StreamerContext {
+ /** Constructor. */
+ private StreamerContext() {
+ }
+
+ /** Instance holder. */
+ private static class Holder {
+ private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
+ private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
+ }
+
+ /**
+ * Obtains grid instance.
+ *
+ * @return Grid instance.
+ */
+ public static Ignite getIgnite() {
+ return Holder.IGNITE;
+ }
+
+ /**
+ * Obtains data streamer instance.
+ *
+ * @return Data streamer instance.
+ */
+ public static IgniteDataStreamer getStreamer() {
+ return Holder.STREAMER;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
index 9115ab4..731f540 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -18,9 +18,10 @@
package org.apache.ignite.stream.kafka;
import junit.framework.TestSuite;
+import org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest;
/**
- * Apache Kafka streamer tests.
+ * Apache Kafka streamers tests.
*/
public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
/**
@@ -30,8 +31,12 @@ public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite");
+ // Kafka streamer.
suite.addTest(new TestSuite(KafkaIgniteStreamerSelfTest.class));
+ // Kafka streamer via Connect API.
+ suite.addTest(new TestSuite(IgniteSinkConnectorTest.class));
+
return suite;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
deleted file mode 100644
index 5e7cee7..0000000
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-import kafka.admin.AdminUtils;
-import kafka.api.LeaderAndIsr;
-import kafka.api.PartitionStateInfo;
-import kafka.api.Request;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.StringEncoder;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-
-/**
- * Kafka Embedded Broker.
- */
-public class KafkaEmbeddedBroker {
- /** Default ZooKeeper host. */
- private static final String ZK_HOST = "localhost";
-
- /** Broker port. */
- private static final int BROKER_PORT = 9092;
-
- /** ZooKeeper connection timeout. */
- private static final int ZK_CONNECTION_TIMEOUT = 6000;
-
- /** ZooKeeper session timeout. */
- private static final int ZK_SESSION_TIMEOUT = 6000;
-
- /** ZooKeeper port. */
- private static int zkPort = 0;
-
- /** Is ZooKeeper ready. */
- private boolean zkReady;
-
- /** Kafka config. */
- private KafkaConfig brokerCfg;
-
- /** Kafka server. */
- private KafkaServer kafkaSrv;
-
- /** ZooKeeper client. */
- private ZkClient zkClient;
-
- /** Embedded ZooKeeper. */
- private EmbeddedZooKeeper zooKeeper;
-
- /**
- * Creates an embedded Kafka broker.
- */
- public KafkaEmbeddedBroker() {
- try {
- setupEmbeddedZooKeeper();
-
- setupEmbeddedKafkaServer();
- }
- catch (IOException | InterruptedException e) {
- throw new RuntimeException("Failed to start Kafka broker " + e);
- }
- }
-
- /**
- * @return ZooKeeper address.
- */
- public static String getZKAddress() {
- return ZK_HOST + ':' + zkPort;
- }
-
- /**
- * Creates a Topic.
- *
- * @param topic Topic name.
- * @param partitions Number of partitions for the topic.
- * @param replicationFactor Replication factor.
- * @throws TimeoutException If operation is timed out.
- * @throws InterruptedException If interrupted.
- */
- public void createTopic(String topic, int partitions, int replicationFactor)
- throws TimeoutException, InterruptedException {
- AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
-
- waitUntilMetadataIsPropagated(topic, 0, 10000, 100);
- }
-
- /**
- * Sends message to Kafka broker.
- *
- * @param keyedMessages List of keyed messages.
- * @return Producer used to send the message.
- */
- public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
- Producer<String, String> producer = new Producer<>(getProducerConfig());
-
- producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
-
- return producer;
- }
-
- /**
- * Shuts down Kafka broker.
- */
- public void shutdown() {
- zkReady = false;
-
- if (kafkaSrv != null)
- kafkaSrv.shutdown();
-
- List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerCfg.logDirs());
-
- for (String logDir : logDirs)
- U.delete(new File(logDir));
-
- if (zkClient != null) {
- zkClient.close();
-
- zkClient = null;
- }
-
- if (zooKeeper != null) {
-
- try {
- zooKeeper.shutdown();
- }
- catch (IOException e) {
- // No-op.
- }
-
- zooKeeper = null;
- }
- }
-
- /**
- * @return ZooKeeper client.
- */
- private ZkClient getZkClient() {
- A.ensure(zkReady, "Zookeeper not setup yet");
- A.notNull(zkClient, "Zookeeper client is not yet initialized");
-
- return zkClient;
- }
-
- /**
- * Checks if topic metadata is propagated.
- *
- * @param topic Topic name.
- * @param part Partition.
- * @return {@code True} if propagated, otherwise {@code false}.
- */
- private boolean isMetadataPropagated(String topic, int part) {
- scala.Option<PartitionStateInfo> partStateOption =
- kafkaSrv.apis().metadataCache().getPartitionInfo(topic, part);
-
- if (!partStateOption.isDefined())
- return false;
-
- PartitionStateInfo partState = partStateOption.get();
-
- LeaderAndIsr LeaderAndIsr = partState.leaderIsrAndControllerEpoch().leaderAndIsr();
-
- return ZkUtils.getLeaderForPartition(getZkClient(), topic, part) != null &&
- Request.isValidBrokerId(LeaderAndIsr.leader()) && LeaderAndIsr.isr().size() >= 1;
- }
-
- /**
- * Waits until metadata is propagated.
- *
- * @param topic Topic name.
- * @param part Partition.
- * @param timeout Timeout value in millis.
- * @param interval Interval in millis to sleep.
- * @throws TimeoutException If operation is timed out.
- * @throws InterruptedException If interrupted.
- */
- private void waitUntilMetadataIsPropagated(String topic, int part, long timeout, long interval)
- throws TimeoutException, InterruptedException {
- int attempt = 1;
-
- long startTime = System.currentTimeMillis();
-
- while (true) {
- if (isMetadataPropagated(topic, part))
- return;
-
- long duration = System.currentTimeMillis() - startTime;
-
- if (duration < timeout)
- Thread.sleep(interval);
- else
- throw new TimeoutException("Metadata propagation is timed out, attempt " + attempt);
-
- attempt++;
- }
- }
-
- /**
- * Sets up embedded Kafka server.
- *
- * @throws IOException If failed.
- */
- private void setupEmbeddedKafkaServer() throws IOException {
- A.ensure(zkReady, "Zookeeper should be setup before hand");
-
- brokerCfg = new KafkaConfig(getBrokerConfig());
-
- kafkaSrv = new KafkaServer(brokerCfg, SystemTime$.MODULE$);
-
- kafkaSrv.startup();
- }
-
- /**
- * Sets up embedded ZooKeeper.
- *
- * @throws IOException If failed.
- * @throws InterruptedException If interrupted.
- */
- private void setupEmbeddedZooKeeper() throws IOException, InterruptedException {
- EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort);
-
- zooKeeper.startup();
-
- zkPort = zooKeeper.getActualPort();
-
- zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
-
- zkReady = true;
- }
-
- /**
- * @return Kafka broker address.
- */
- private static String getBrokerAddress() {
- return ZK_HOST + ':' + BROKER_PORT;
- }
-
- /**
- * Gets Kafka broker config.
- *
- * @return Kafka broker config.
- * @throws IOException If failed.
- */
- private static Properties getBrokerConfig() throws IOException {
- Properties props = new Properties();
-
- props.put("broker.id", "0");
- props.put("host.name", ZK_HOST);
- props.put("port", "" + BROKER_PORT);
- props.put("log.dir", createTempDir("_cfg").getAbsolutePath());
- props.put("zookeeper.connect", getZKAddress());
- props.put("log.flush.interval.messages", "1");
- props.put("replica.socket.timeout.ms", "1500");
-
- return props;
- }
-
- /**
- * @return Kafka Producer config.
- */
- private static ProducerConfig getProducerConfig() {
- Properties props = new Properties();
-
- props.put("metadata.broker.list", getBrokerAddress());
- props.put("serializer.class", StringEncoder.class.getName());
- props.put("key.serializer.class", StringEncoder.class.getName());
- props.put("partitioner.class", SimplePartitioner.class.getName());
-
- return new ProducerConfig(props);
- }
-
- /**
- * Creates temp directory.
- *
- * @param prefix Prefix.
- * @return Created file.
- * @throws IOException If failed.
- */
- private static File createTempDir( String prefix) throws IOException {
- Path path = Files.createTempDirectory(prefix);
-
- return path.toFile();
- }
-
- /**
- * Creates embedded ZooKeeper.
- */
- private static class EmbeddedZooKeeper {
- /** Default ZooKeeper host. */
- private final String zkHost;
-
- /** Default ZooKeeper port. */
- private final int zkPort;
-
- /** NIO context factory. */
- private NIOServerCnxnFactory factory;
-
- /** Snapshot directory. */
- private File snapshotDir;
-
- /** Log directory. */
- private File logDir;
-
- /**
- * Creates an embedded ZooKeeper.
- *
- * @param zkHost ZooKeeper host.
- * @param zkPort ZooKeeper port.
- */
- EmbeddedZooKeeper(String zkHost, int zkPort) {
- this.zkHost = zkHost;
- this.zkPort = zkPort;
- }
-
- /**
- * Starts up ZooKeeper.
- *
- * @throws IOException If failed.
- * @throws InterruptedException If interrupted.
- */
- void startup() throws IOException, InterruptedException {
- snapshotDir = createTempDir("_ss");
-
- logDir = createTempDir("_log");
-
- ZooKeeperServer zkSrv = new ZooKeeperServer(snapshotDir, logDir, 500);
-
- factory = new NIOServerCnxnFactory();
-
- factory.configure(new InetSocketAddress(zkHost, zkPort), 16);
-
- factory.startup(zkSrv);
- }
-
- /**
- * @return Actual port ZooKeeper is started.
- */
- int getActualPort() {
- return factory.getLocalPort();
- }
-
- /**
- * Shuts down ZooKeeper.
- *
- * @throws IOException If failed.
- */
- void shutdown() throws IOException {
- if (factory != null) {
- factory.shutdown();
-
- U.delete(snapshotDir);
-
- U.delete(logDir);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 927ba3d..829c877 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.consumer.ConsumerConfig;
import kafka.producer.KeyedMessage;
@@ -40,14 +41,13 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
-import static org.apache.ignite.stream.kafka.KafkaEmbeddedBroker.getZKAddress;
/**
* Tests {@link KafkaStreamer}.
*/
public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
/** Embedded Kafka. */
- private KafkaEmbeddedBroker embeddedBroker;
+ private TestKafkaBroker embeddedBroker;
/** Count. */
private static final int CNT = 100;
@@ -77,7 +77,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
@Override protected void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
- embeddedBroker = new KafkaEmbeddedBroker();
+ embeddedBroker = new TestKafkaBroker();
}
/** {@inheritDoc} */
@@ -176,7 +176,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
kafkaStmr.setThreads(4);
// Set the consumer configuration.
- kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(getZKAddress(), "groupX"));
+ kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
// Set the decoders.
StringDecoder strDecoder = new StringDecoder(new VerifiableProperties());
@@ -199,7 +199,8 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
- latch.await();
+ // Checks all events successfully processed in 10 seconds.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
for (Map.Entry<String, String> entry : keyValMap.entrySet())
assertEquals(entry.getValue(), cache.get(entry.getKey()));
@@ -232,4 +233,4 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
return new ConsumerConfig(props);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
deleted file mode 100644
index b49bebe..0000000
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.kafka;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Simple partitioner for Kafka.
- */
-@SuppressWarnings("UnusedDeclaration")
-public class SimplePartitioner implements Partitioner {
- /**
- * Constructs instance.
- *
- * @param props Properties.
- */
- public SimplePartitioner(VerifiableProperties props) {
- // No-op.
- }
-
- /**
- * Partitions the key based on the key value.
- *
- * @param key Key.
- * @param partSize Partition size.
- * @return partition Partition.
- */
- public int partition(Object key, int partSize) {
- String keyStr = (String)key;
-
- String[] keyValues = keyStr.split("\\.");
-
- Integer intKey = Integer.parseInt(keyValues[3]);
-
- return intKey > 0 ? intKey % partSize : 0;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
new file mode 100644
index 0000000..70acb78
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Producer;
+import kafka.producer.ProducerConfig;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.TestUtils;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.curator.test.TestingServer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import scala.Tuple2;
+
+/**
+ * Kafka Test Broker.
+ */
+public class TestKafkaBroker {
+ /** ZooKeeper connection timeout. */
+ private static final int ZK_CONNECTION_TIMEOUT = 6000;
+
+ /** ZooKeeper session timeout. */
+ private static final int ZK_SESSION_TIMEOUT = 6000;
+
+ /** ZooKeeper port. */
+ private static final int ZK_PORT = 21811;
+
+ /** Broker host. */
+ private static final String BROKER_HOST = "localhost";
+
+ /** Broker port. */
+ private static final int BROKER_PORT = 9092;
+
+ /** Kafka config. */
+ private KafkaConfig kafkaCfg;
+
+ /** Kafka server. */
+ private KafkaServer kafkaSrv;
+
+ /** ZooKeeper. */
+ private TestingServer zkServer;
+
+ /** Kafka Zookeeper utils. */
+ private ZkUtils zkUtils;
+
+ /**
+ * Kafka broker constructor.
+ */
+ public TestKafkaBroker() {
+ try {
+ setupZooKeeper();
+
+ setupKafkaServer();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed to start Kafka: " + e);
+ }
+ }
+
+ /**
+ * Creates a topic.
+ *
+ * @param topic Topic name.
+ * @param partitions Number of partitions for the topic.
+ * @param replicationFactor Replication factor.
+ * @throws TimeoutException If operation is timed out.
+ * @throws InterruptedException If interrupted.
+ */
+ public void createTopic(String topic, int partitions, int replicationFactor)
+ throws TimeoutException, InterruptedException {
+ List<KafkaServer> servers = new ArrayList<>();
+
+ servers.add(kafkaSrv);
+
+ TestUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
+ scala.collection.JavaConversions.asScalaBuffer(servers), new Properties());
+ }
+
+ /**
+ * Sends a message to Kafka broker.
+ *
+ * @param keyedMessages List of keyed messages.
+ * @return Producer used to send the message.
+ */
+ public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
+ Producer<String, String> producer = new Producer<>(getProducerConfig());
+
+ producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+
+ return producer;
+ }
+
+ /**
+ * Shuts down test Kafka broker.
+ */
+ public void shutdown() {
+ if (zkUtils != null)
+ zkUtils.close();
+
+ if (kafkaSrv != null)
+ kafkaSrv.shutdown();
+
+ if (zkServer != null) {
+ try {
+ zkServer.stop();
+ }
+ catch (IOException e) {
+ // No-op.
+ }
+ }
+
+ List<String> logDirs = scala.collection.JavaConversions.seqAsJavaList(kafkaCfg.logDirs());
+
+ for (String logDir : logDirs)
+ U.delete(new File(logDir));
+ }
+
+ /**
+ * Sets up test Kafka broker.
+ *
+ * @throws IOException If failed.
+ */
+ private void setupKafkaServer() throws IOException {
+ kafkaCfg = new KafkaConfig(getKafkaConfig());
+
+ kafkaSrv = TestUtils.createServer(kafkaCfg, SystemTime$.MODULE$);
+
+ kafkaSrv.startup();
+ }
+
+ /**
+ * Sets up ZooKeeper test server.
+ *
+ * @throws Exception If failed.
+ */
+ private void setupZooKeeper() throws Exception {
+ zkServer = new TestingServer(ZK_PORT, true);
+
+ Tuple2<ZkClient, ZkConnection> zkTuple = ZkUtils.createZkClientAndConnection(zkServer.getConnectString(),
+ ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
+
+ zkUtils = new ZkUtils(zkTuple._1(), zkTuple._2(), false);
+ }
+
+ /**
+ * Obtains Kafka config.
+ *
+ * @return Kafka config.
+ * @throws IOException If failed.
+ */
+ private Properties getKafkaConfig() throws IOException {
+ Properties props = new Properties();
+
+ props.put("broker.id", "0");
+ props.put("zookeeper.connect", zkServer.getConnectString());
+ props.put("host.name", BROKER_HOST);
+ props.put("port", BROKER_PORT);
+ props.put("offsets.topic.replication.factor", "1");
+ props.put("log.dir", createTmpDir("_cfg").getAbsolutePath());
+ props.put("log.flush.interval.messages", "1");
+
+ return props;
+ }
+
+ /**
+ * Obtains broker address.
+ *
+ * @return Kafka broker address.
+ */
+ public String getBrokerAddress() {
+ return BROKER_HOST + ":" + BROKER_PORT;
+ }
+
+ /**
+ * Obtains Zookeeper address.
+ *
+ * @return Zookeeper address.
+ */
+ public String getZookeeperAddress() {
+ return BROKER_HOST + ":" + ZK_PORT;
+ }
+
+ /**
+ * Obtains producer config.
+ *
+ * @return Kafka Producer config.
+ */
+ private ProducerConfig getProducerConfig() {
+ Properties props = new Properties();
+
+ props.put("metadata.broker.list", getBrokerAddress());
+ props.put("bootstrap.servers", getBrokerAddress());
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+
+ return new ProducerConfig(props);
+ }
+
+ /**
+ * Creates temporary directory.
+ *
+ * @param prefix Prefix.
+ * @return Created file.
+ * @throws IOException If failed.
+ */
+ private static File createTmpDir(String prefix) throws IOException {
+ Path path = Files.createTempDirectory(prefix);
+
+ return path.toFile();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
new file mode 100644
index 0000000..a8583d0
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Producer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.mock;
+
+/**
+ * Tests for {@link IgniteSinkConnector}.
+ */
+public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
+ /** Number of input messages. */
+ private static final int EVENT_CNT = 10000;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "testCache";
+
+ /** Test topics. */
+ private static final String[] TOPICS = {"test1", "test2"};
+
+ /** Kafka partition. */
+ private static final int PARTITIONS = 3;
+
+ /** Kafka replication factor. */
+ private static final int REPLICATION_FACTOR = 1;
+
+ /** Test Kafka broker. */
+ private TestKafkaBroker kafkaBroker;
+
+ /** Worker to run tasks. */
+ private Worker worker;
+
+ /** Workers' herder. */
+ private Herder herder;
+
+ /** Ignite server node. */
+ private Ignite grid;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected void beforeTest() throws Exception {
+ IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
+
+ cfg.setClientMode(false);
+
+ grid = startGrid("igniteServerNode", cfg);
+
+ kafkaBroker = new TestKafkaBroker();
+
+ for (String topic : TOPICS)
+ kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
+
+ WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps());
+
+ OffsetBackingStore offsetBackingStore = mock(OffsetBackingStore.class);
+ offsetBackingStore.configure(anyObject(Map.class));
+
+ worker = new Worker(workerConfig, offsetBackingStore);
+ worker.start();
+
+ herder = new StandaloneHerder(worker);
+ herder.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ herder.stop();
+
+ worker.stop();
+
+ kafkaBroker.shutdown();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Tests the whole data flow from injecting data to Kafka to transferring it to the grid. It reads from two
+ * specified Kafka topics, because a sink task can read from multiple topics.
+ *
+ * @throws Exception Thrown in case of the failure.
+ */
+ public void testSinkPuts() throws Exception {
+ Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+ @Override
+ public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+ if (error != null)
+ throw new RuntimeException("Failed to create a job!");
+ }
+ });
+
+ herder.putConnectorConfig(
+ sinkProps.get(ConnectorConfig.NAME_CONFIG),
+ sinkProps, false, cb);
+
+ cb.get();
+
+ final CountDownLatch latch = new CountDownLatch(EVENT_CNT * TOPICS.length);
+
+ final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt != null;
+
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, EVT_CACHE_OBJECT_PUT);
+
+ IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+ assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+ Map<String, String> keyValMap = new HashMap<>(EVENT_CNT * TOPICS.length);
+
+ // Produces events for the specified number of topics
+ for (String topic : TOPICS)
+ keyValMap.putAll(produceStream(topic));
+
+ // Checks all events successfully processed in 10 seconds.
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr);
+
+ // Checks that each event was processed properly.
+ for (Map.Entry<String, String> entry : keyValMap.entrySet())
+ assertEquals(entry.getValue(), cache.get(entry.getKey()));
+
+ assertEquals(EVENT_CNT * TOPICS.length, cache.size(CachePeekMode.PRIMARY));
+ }
+
+ /**
+ * Sends messages to Kafka.
+ *
+ * @param topic Topic name.
+ * @return Map of key value messages.
+ */
+ private Map<String, String> produceStream(String topic) {
+ List<KeyedMessage<String, String>> messages = new ArrayList<>(EVENT_CNT);
+
+ Map<String, String> keyValMap = new HashMap<>();
+
+ for (int evt = 0; evt < EVENT_CNT; evt++) {
+ long runtime = System.currentTimeMillis();
+
+ String key = topic + "_" + String.valueOf(evt);
+ String msg = runtime + String.valueOf(evt);
+
+ messages.add(new KeyedMessage<>(topic, key, msg));
+
+ keyValMap.put(key, msg);
+ }
+
+ Producer<String, String> producer = kafkaBroker.sendMessages(messages);
+
+ producer.close();
+
+ return keyValMap;
+ }
+
+ /**
+ * Creates properties for test sink connector.
+ *
+ * @param topics Topics.
+ * @return Test sink connector properties.
+ */
+ private Map<String, String> makeSinkProps(String topics) {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(ConnectorConfig.TOPICS_CONFIG, topics);
+ props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+ props.put(ConnectorConfig.NAME_CONFIG, "test-connector");
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
+ props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
+ props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");
+ props.put(IgniteSinkConstants.CACHE_CFG_PATH, "example-ignite.xml");
+
+ return props;
+ }
+
+ /**
+ * Creates properties for Kafka Connect workers.
+ *
+ * @return Worker configurations.
+ */
+ private Map<String, String> makeWorkerProps() {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("internal.key.converter.schemas.enable", "false");
+ props.put("internal.value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+ props.put("key.converter.schemas.enable", "false");
+ props.put("value.converter.schemas.enable", "false");
+ props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+ // fast flushing for testing.
+ props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/resources/example-ignite.xml b/modules/kafka/src/test/resources/example-ignite.xml
new file mode 100644
index 0000000..fbb05d3
--- /dev/null
+++ b/modules/kafka/src/test/resources/example-ignite.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ Ignite configuration with all defaults and enabled p2p deployment and enabled events.
+ Used for testing IgniteSink running Ignite in a client mode.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <!-- Enable client mode. -->
+ <property name="clientMode" value="true"/>
+
+ <!-- Cache accessed from IgniteSink. -->
+ <property name="cacheConfiguration">
+ <list>
+ <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. -->
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="atomicityMode" value="ATOMIC"/>
+
+ <property name="name" value="testCache"/>
+ </bean>
+ </list>
+ </property>
+
+ <!-- Enable cache events. -->
+ <property name="includeEventTypes">
+ <list>
+ <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
+ <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+ </list>
+ </property>
+
+ <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:47500</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 8f02fec..21d8c69 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -61,6 +61,7 @@
<commons.lang.version>2.6</commons.lang.version>
<cron4j.version>2.2.5</cron4j.version>
<curator.version>2.9.1</curator.version>
+ <easymock.version>3.4</easymock.version>
<ezmorph.bundle.version>1.0.6_1</ezmorph.bundle.version>
<ezmorph.version>1.0.6</ezmorph.version>
<flume.ng.version>1.6.0</flume.ng.version>
@@ -82,11 +83,9 @@
<jsonlib.bundle.version>2.4_1</jsonlib.bundle.version>
<jsonlib.version>2.4</jsonlib.version>
<jtidy.version>r938</jtidy.version>
- <kafka.bundle.version>0.8.2.1_1</kafka.bundle.version>
- <kafka.clients.bundle.version>0.8.2.0_1</kafka.clients.bundle.version>
- <kafka.clients.version>0.8.2.0</kafka.clients.version>
- <kafka.version>0.8.2.1</kafka.version>
- <kafka.version>0.8.2.1</kafka.version>
+ <kafka.bundle.version>0.9.0.0_1</kafka.bundle.version>
+ <kafka.clients.bundle.version>0.9.0.0_1</kafka.clients.bundle.version>
+ <kafka.version>0.9.0.0</kafka.version>
<karaf.version>4.0.2</karaf.version>
<lucene.bundle.version>3.5.0_1</lucene.bundle.version>
<lucene.version>3.5.0</lucene.version>