You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/01 11:24:10 UTC

[01/12] ignite git commit: Proper initialization of utility and marshalling pools.

Repository: ignite
Updated Branches:
  refs/heads/ignite-2224 eccbd0d80 -> 21567fab3


Proper initialization of utility and marshalling pools.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2e5638d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2e5638d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2e5638d

Branch: refs/heads/ignite-2224
Commit: c2e5638d01fbe44f90878d1aee6bf4979a935a7c
Parents: a34d705
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jan 29 15:51:44 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Jan 29 15:51:44 2016 +0300

----------------------------------------------------------------------
 .../src/main/java/org/apache/ignite/internal/IgnitionEx.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c2e5638d/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 5153fb3..8f23b05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1662,7 +1662,7 @@ public class IgnitionEx {
                 "utility",
                 cfg.getGridName(),
                 myCfg.getUtilityCacheThreadPoolSize(),
-                DFLT_SYSTEM_MAX_THREAD_CNT,
+                myCfg.getUtilityCacheThreadPoolSize(),
                 myCfg.getUtilityCacheKeepAliveTime(),
                 new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
 
@@ -1670,7 +1670,7 @@ public class IgnitionEx {
                 "marshaller-cache",
                 cfg.getGridName(),
                 myCfg.getMarshallerCacheThreadPoolSize(),
-                DFLT_SYSTEM_MAX_THREAD_CNT,
+                myCfg.getMarshallerCacheThreadPoolSize(),
                 myCfg.getMarshallerCacheKeepAliveTime(),
                 new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
 


[12/12] ignite git commit: ignite-2224 getEntry method inside transaction.

Posted by sb...@apache.org.
ignite-2224 getEntry method inside transaction.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21567fab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21567fab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21567fab

Branch: refs/heads/ignite-2224
Commit: 21567fab3afc5c7f2fc7bb4204cf75f2328511d0
Parents: 4ede58b
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 1 13:23:13 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 1 13:23:13 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheEntryImplEx.java      |  14 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   4 +-
 ...arOptimisticSerializableTxPrepareFuture.java |   2 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   4 +
 .../GridNearPessimisticTxPrepareFuture.java     |   2 +
 .../cache/transactions/IgniteTxEntry.java       |  32 +-
 .../transactions/IgniteTxLocalAdapter.java      |  50 +-
 .../cache/transactions/IgniteTxManager.java     |   2 +-
 .../cache/CacheGetEntryAbstractSeltTest.java    | 525 ------------
 .../cache/CacheGetEntryAbstractTest.java        | 803 +++++++++++++++++++
 ...GetEntryOptimisticReadCommittedSeltTest.java |   5 +-
 ...etEntryOptimisticRepeatableReadSeltTest.java |   5 +-
 ...eGetEntryOptimisticSerializableSeltTest.java |   5 +-
 ...etEntryPessimisticReadCommittedSeltTest.java |   5 +-
 ...tEntryPessimisticRepeatableReadSeltTest.java |   5 +-
 ...GetEntryPessimisticSerializableSeltTest.java |   5 +-
 16 files changed, 904 insertions(+), 564 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
index 1c7111a..af926c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImplEx.java
@@ -21,9 +21,13 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
+import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
+
 /**
  *
  */
@@ -54,6 +58,14 @@ public class CacheEntryImplEx<K, V> extends CacheEntryImpl<K, V> implements Cach
 
     /** {@inheritDoc} */
     public GridCacheVersion version() {
+        if (ver == GET_ENTRY_INVALID_VER_AFTER_GET) {
+            throw new IgniteException("Impossible to get entry version after " +
+                "get() inside OPTIMISTIC REPEATABLE_READ transaction. Use only getEntry() or getEntries() inside " +
+                "OPTIMISTIC REPEATABLE_READ transaction to solve this problem.");
+        }
+        else if (ver == GET_ENTRY_INVALID_VER_UPDATED)
+            throw new IgniteException("Impossible to get version for entry updated in transaction.");
+
         return ver;
     }
 
@@ -81,7 +93,7 @@ public class CacheEntryImplEx<K, V> extends CacheEntryImpl<K, V> implements Cach
         String res = "CacheEntry [key=" + getKey() +
             ", val=" + getValue();
 
-        if (ver != null) {
+        if (ver != null && ver != GET_ENTRY_INVALID_VER_AFTER_GET && ver != GET_ENTRY_INVALID_VER_UPDATED) {
             res += ", topVer=" + ver.topologyVersion() +
                 ", nodeOrder=" + ver.nodeOrder() +
                 ", order=" + ver.order() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d8b2f37..41b28d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -946,7 +946,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (retVal ||
             !F.isEmpty(e.entryProcessors()) ||
             !F.isEmpty(e.filters()) ||
-            e.serializableReadVersion() != null) {
+            e.entryReadVersion() != null) {
             if (map == null)
                 map = new HashMap<>();
 
@@ -1013,7 +1013,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         throws IgniteCheckedException {
         try {
             for (IgniteTxEntry entry : entries) {
-                GridCacheVersion serReadVer = entry.serializableReadVersion();
+                GridCacheVersion serReadVer = entry.entryReadVersion();
 
                 if (serReadVer != null) {
                     entry.cached().unswap();

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 4f9f227..52ebfc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -107,7 +107,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
             if (txEntry != null) {
                 if (entry.context().isLocal()) {
-                    GridCacheVersion serReadVer = txEntry.serializableReadVersion();
+                    GridCacheVersion serReadVer = txEntry.entryReadVersion();
 
                     if (serReadVer != null) {
                         GridCacheContext ctx = entry.context();

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index bae0327..b968e57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -279,6 +279,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
      * @param topLocked {@code True} if thread already acquired lock preventing topology change.
      */
     private void prepareSingle(IgniteTxEntry write, boolean topLocked) {
+        write.clearEntryReadVersion();
+
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
         assert topVer.topologyVersion() > 0;
@@ -339,6 +341,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
 
         for (IgniteTxEntry write : writes) {
+            write.clearEntryReadVersion();
+
             GridDistributedTxMapping updated = map(write, topVer, cur, topLocked);
 
             if (cur != updated) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 8170008..615a92b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -176,6 +176,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         txMapping = new GridDhtTxMapping();
 
         for (IgniteTxEntry txEntry : tx.allEntries()) {
+            txEntry.clearEntryReadVersion();
+
             GridCacheContext cacheCtx = txEntry.context();
 
             List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index f731975..8b871a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
@@ -73,6 +74,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     /** Dummy version for any existing entry read in SERIALIZABLE transaction. */
     public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new GridCacheVersion(0, 0, 0, 1);
 
+    /** */
+    public static final GridCacheVersion GET_ENTRY_INVALID_VER_UPDATED = new GridCacheVersion(0, 0, 0, 2);
+
+    /** */
+    public static final GridCacheVersion GET_ENTRY_INVALID_VER_AFTER_GET = new GridCacheVersion(0, 0, 0, 3);
+
     /** Prepared flag updater. */
     private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
         AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared");
@@ -918,13 +925,30 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
-     * @param serReadVer Read version for serializable transaction.
+     * Gets stored entry version. Version is stored for all entries in serializable transaction or
+     * when value is read using {@link IgniteCache#getEntry(Object)} method.
+     *
+     * @return Entry version.
+     */
+    @Nullable public GridCacheVersion entryReadVersion() {
+        return serReadVer;
+    }
+
+    /**
+     * @param ver Entry version.
      */
-    public void serializableReadVersion(GridCacheVersion serReadVer) {
+    public void entryReadVersion(GridCacheVersion  ver) {
         assert this.serReadVer == null;
-        assert serReadVer != null;
+        assert ver != null;
 
-        this.serReadVer = serReadVer;
+        this.serReadVer = ver;
+    }
+
+    /**
+     * Clears recorded read version, should be done before starting commit of not serializable/optimistic transaction.
+     */
+    public void clearEntryReadVersion() {
+        serReadVer = null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 298413b..a999358 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -1409,25 +1410,34 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
                         if (needVer) {
                             if (txEntry.op() != READ)
-                                ver = xidVersion();
+                                ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_UPDATED;
                             else {
-                                if (serializable()) {
-                                    ver = txEntry.serializableReadVersion();
+                                ver = txEntry.entryReadVersion();
 
-                                    assert ver != null; // TODO: value should not be null;
-                                }
-                                else if (optimistic()) {
-                                    assert isolation() == TransactionIsolation.REPEATABLE_READ;
+                                if (ver == null && pessimistic()) {
+                                    while (true) {
+                                        try {
+                                            GridCacheEntryEx cached = txEntry.cached();
+
+                                            ver = cached.isNear() ?
+                                                ((GridNearCacheEntry)cached).dhtVersion() : cached.version();
 
-                                    throw new IgniteCheckedException(
-                                        "Impossible to getEntry() or getEntries() after get() at " +
-                                            "OPTIMISTIC REPEATABLE_READ case. " +
-                                            "Use only getEntry() or getEntries() to solve the problem. ");
+                                            break;
+                                        }
+                                        catch (GridCacheEntryRemovedException rmvdErr) {
+                                            txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topVer));
+                                        }
+                                    }
                                 }
-                                else {
-                                    assert ver != null; // TODO: value should not be null;
+
+                                if (ver == null) {
+                                    assert optimistic() && repeatableRead() : this;
+
+                                    ver = IgniteTxEntry.GET_ENTRY_INVALID_VER_AFTER_GET;
                                 }
                             }
+
+                            assert ver != null;
                         }
 
                         cacheCtx.addResult(map, key, val, skipVals, keepCacheObjects, deserializeBinary, false, ver);
@@ -1601,7 +1611,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                 if (needReadVer) {
                                     assert readVer != null;
 
-                                    txEntry.serializableReadVersion(readVer);
+                                    txEntry.entryReadVersion(readVer);
                                 }
                             }
                         }
@@ -1751,7 +1761,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                             if (needReadVer) {
                                 assert loadVer != null;
 
-                                txEntry.serializableReadVersion(loadVer);
+                                txEntry.entryReadVersion(loadVer);
                             }
 
                             if (visibleVal != null) {
@@ -1771,6 +1781,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         final GridCacheContext cacheCtx,
         Collection<KeyCacheObject> keys,
@@ -1915,6 +1926,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             deserializeBinary,
                                             false,
                                             readVer);
+
+                                        if (readVer != null)
+                                            txEntry.entryReadVersion(readVer);
                                     }
 
                                     // Even though we bring the value back from lock acquisition,
@@ -2399,7 +2413,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                     if (needReadVer) {
                         assert loadVer != null;
 
-                        e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
+                        e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer);
                     }
 
                     if (singleRmv) {
@@ -2592,7 +2606,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                             if (needReadVer) {
                                 assert readVer != null;
 
-                                txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                                txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
                             }
                         }
 
@@ -2645,7 +2659,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                             if (needReadVer) {
                                 assert readVer != null;
 
-                                txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
+                                txEntry.entryReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer);
                             }
 
                             if (retval && !transform)

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 7a3b8ff..643ba2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1419,7 +1419,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     assert !entry1.detached() : "Expected non-detached entry for near transaction " +
                         "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']';
 
-                    GridCacheVersion serReadVer = txEntry1.serializableReadVersion();
+                    GridCacheVersion serReadVer = txEntry1.entryReadVersion();
 
                     assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractSeltTest.java
deleted file mode 100644
index 2e94cbc..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractSeltTest.java
+++ /dev/null
@@ -1,525 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.cache.CacheEntry;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionConcurrency;
-import org.apache.ignite.transactions.TransactionIsolation;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-
-/**
- * Test getEntry and getEntries methods.
- */
-public abstract class CacheGetEntryAbstractSeltTest extends GridCacheAbstractSelfTest {
-
-    @Override protected int gridCount() {
-        return 3;
-    }
-
-    abstract protected TransactionConcurrency concurrency();
-
-    abstract protected TransactionIsolation isolation();
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setMarshaller(null);
-
-        return cfg;
-    }
-
-    /** */
-    public void testNear() {
-        CacheConfiguration cfg = new CacheConfiguration();
-
-        cfg.setCacheMode(CacheMode.PARTITIONED);
-        cfg.setName("near");
-        cfg.setNearConfiguration(new NearCacheConfiguration());
-
-        test(cfg);
-    }
-
-    /** */
-    public void testNearTransactional() {
-        CacheConfiguration cfg = new CacheConfiguration();
-
-        cfg.setCacheMode(CacheMode.PARTITIONED);
-        cfg.setAtomicityMode(TRANSACTIONAL);
-        cfg.setName("nearT");
-        cfg.setNearConfiguration(new NearCacheConfiguration());
-
-        test(cfg);
-    }
-
-    /** */
-    public void testPartitioned() {
-        CacheConfiguration cfg = new CacheConfiguration();
-        cfg.setCacheMode(CacheMode.PARTITIONED);
-        cfg.setName("partitioned");
-
-        test(cfg);
-    }
-
-    /** */
-    public void testPartitionedTransactional() {
-        CacheConfiguration cfg = new CacheConfiguration();
-
-        cfg.setCacheMode(CacheMode.PARTITIONED);
-        cfg.setAtomicityMode(TRANSACTIONAL);
-        cfg.setName("partitionedT");
-
-        test(cfg);
-    }
-
-    /** */
-    public void testLocal() {
-        CacheConfiguration cfg = new CacheConfiguration();
-
-        cfg.setCacheMode(CacheMode.LOCAL);
-        cfg.setName("local");
-
-        test(cfg);
-    }
-
-    /** */
-    public void testLocalTransactional() {
-        CacheConfiguration cfg = new CacheConfiguration();
-
-        cfg.setCacheMode(CacheMode.LOCAL);
-        cfg.setAtomicityMode(TRANSACTIONAL);
-        cfg.setName("localT");
-
-        test(cfg);
-    }
-
-    /** */
-    public void testReplicated() {
-        CacheConfiguration cfg = new CacheConfiguration();
-
-        cfg.setCacheMode(CacheMode.REPLICATED);
-        cfg.setName("replicated");
-
-        test(cfg);
-    }
-
-    /** */
-    public void testReplicatedTransactional() {
-        CacheConfiguration cfg = new CacheConfiguration();
-
-        cfg.setCacheMode(CacheMode.REPLICATED);
-        cfg.setAtomicityMode(TRANSACTIONAL);
-        cfg.setName("replicatedT");
-
-        test(cfg);
-    }
-
-    /** */
-    private void test(CacheConfiguration cfg) {
-        test(cfg, true);
-        test(cfg, false);
-    }
-
-    /** */
-    private void test(CacheConfiguration cfg, boolean oneEntry) {
-        IgniteCache<Integer, TestValue> cache = grid(0).createCache(cfg);
-        try {
-            init(cache);
-
-            test(cache, null, null, null, oneEntry);
-
-            if (cfg.getAtomicityMode() == TRANSACTIONAL) {
-                TransactionConcurrency txConcurrency = concurrency();
-                TransactionIsolation txIsolation = isolation();
-                try (Transaction tx = grid(0).transactions().txStart(txConcurrency, txIsolation, 100000, 1000)) {
-                    initTx(cache);
-
-                    test(cache, txConcurrency, txIsolation, tx, oneEntry);
-
-                    tx.commit();
-                }
-            }
-        }
-        finally {
-            cache.destroy();
-        }
-    }
-
-    private Set<Integer> getKeys(int base) {
-        int start = 0;
-        int finish = 100;
-
-        Set<Integer> keys = new HashSet<>(finish - start);
-
-        for (int i = base + start; i < base + finish; ++i)
-            keys.add(i);
-
-        return keys;
-    }
-
-    private Set<Integer> createdBeforeTxKeys() {
-        return getKeys(0);
-    }
-
-    private Set<Integer> createdBeforeTxWithBinaryKeys() {
-        return getKeys(1_000);
-    }
-
-    private Set<Integer> createdBeforeTxKeys2() {
-        return getKeys(2_000);
-    }
-
-    private Set<Integer> createdBeforeTxWithBinaryKeys2() {
-        return getKeys(3_000);
-    }
-
-    private Set<Integer> createdBeforeTxKeys3() {
-        return getKeys(4_000);
-    }
-
-    private Set<Integer> createdBeforeTxWithBinaryKeys3() {
-        return getKeys(5_000);
-    }
-
-    private Set<Integer> removedBeforeTxKeys() {
-        return getKeys(6_000);
-    }
-
-    private Set<Integer> removedBeforeTxWithBinaryKeys() {
-        return getKeys(7_000);
-    }
-
-    private Set<Integer> createdAtTxKeys() {
-        return getKeys(8_000);
-    }
-
-    private Set<Integer> createdAtTxWithBinaryKeys() {
-        return getKeys(9_000);
-    }
-
-    private Set<Integer> removedAtTxKeys() {
-        return getKeys(10_000);
-    }
-
-    private Set<Integer> removedAtTxWithBinaryKeys() {
-        return getKeys(11_000);
-    }
-
-    /** */
-    private void init(IgniteCache<Integer, TestValue> cache) {
-        Set<Integer> keys = new HashSet<>();
-
-        keys.addAll(createdBeforeTxKeys());
-        keys.addAll(createdBeforeTxWithBinaryKeys());
-        keys.addAll(createdBeforeTxKeys2());
-        keys.addAll(createdBeforeTxWithBinaryKeys2());
-        keys.addAll(createdBeforeTxKeys3());
-        keys.addAll(createdBeforeTxWithBinaryKeys3());
-        keys.addAll(removedBeforeTxKeys());
-        keys.addAll(removedBeforeTxWithBinaryKeys());
-        keys.addAll(removedAtTxKeys());
-        keys.addAll(removedAtTxWithBinaryKeys());
-
-        for (int i : keys)
-            cache.put(i, new TestValue(i));
-
-        for (int i : removedBeforeTxKeys())
-            cache.remove(i);
-
-        for (int i : removedBeforeTxWithBinaryKeys())
-            cache.remove(i);
-    }
-
-    /** */
-    private void initTx(IgniteCache<Integer, TestValue> cache) {
-        for (int i : createdAtTxKeys())
-            cache.put(i, new TestValue(i));
-
-        for (int i : createdAtTxWithBinaryKeys())
-            cache.put(i, new TestValue(i));
-
-        for (int i : removedAtTxKeys())
-            cache.remove(i);
-
-        for (int i : removedAtTxWithBinaryKeys())
-            cache.remove(i);
-    }
-
-    /** */
-    private void compareVersionWithPrimaryNode(CacheEntry<Integer, ?> e, IgniteCache<Integer, TestValue> cache) {
-        CacheConfiguration cfg = cache.getConfiguration(CacheConfiguration.class);
-
-        if (cfg.getCacheMode() != CacheMode.LOCAL) {
-            Ignite prim = primaryNode(e.getKey(), cache.getName());
-
-            GridCacheAdapter<Object, Object> cacheAdapter = ((IgniteKernal)prim).internalCache(cache.getName());
-
-            if (cfg.getNearConfiguration() != null)
-                cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht();
-
-            IgniteCacheObjectProcessor cacheObjects = cacheAdapter.context().cacheObjects();
-
-            CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext();
-
-            GridCacheMapEntry me = cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(
-                cacheObjCtx, e.getKey(), true));
-
-            try {
-                assertEquals(me.version(), e.version());
-            }
-            catch (GridCacheEntryRemovedException ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-    }
-
-    private void checkData(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry, GridCacheVersion txVer) {
-        if (oneEntry) {
-            CacheEntry<Integer, TestValue> e = cache.getEntry(i);
-
-            if (txVer != null)
-                assertEquals(txVer, e.version());
-            else
-                compareVersionWithPrimaryNode(e, cache);
-
-            assertEquals(e.getValue().val, i);
-        }
-        else {
-            Set<Integer> set = new HashSet<>();
-
-            for (int j = 0; j < 10; j++)
-                set.add(i + j);
-
-            Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set);
-
-            for (CacheEntry<Integer, TestValue> e : es) {
-                if (txVer != null)
-                    assertEquals(txVer, e.version());
-                else
-                    compareVersionWithPrimaryNode(e, cache);
-
-                assertEquals((Integer)e.getValue().val, e.getKey());
-
-                assertTrue(set.contains(e.getValue().val));
-            }
-        }
-    }
-
-    private void checkBinaryData(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry,
-        GridCacheVersion txVer) {
-        IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
-
-        if (oneEntry) {
-            CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
-
-            if (txVer != null)
-                assertEquals(txVer, e.version());
-            else
-                compareVersionWithPrimaryNode(e, cache);
-
-            assertEquals(((TestValue)e.getValue().deserialize()).val, i);
-        }
-        else {
-            Set<Integer> set = new HashSet<>();
-
-            for (int j = 0; j < 10; j++)
-                set.add(i + j);
-
-            Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set);
-
-            for (CacheEntry<Integer, BinaryObject> e : es) {
-                if (txVer != null)
-                    assertEquals(txVer, e.version());
-                else
-                    compareVersionWithPrimaryNode(e, cache);
-
-                TestValue tv = e.getValue().deserialize();
-
-                assertEquals((Integer)tv.val, e.getKey());
-
-                assertTrue(set.contains((tv).val));
-            }
-        }
-    }
-
-    private void checkRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
-        if (oneEntry) {
-            CacheEntry<Integer, TestValue> e = cache.getEntry(i);
-
-            assertNull(e);
-        }
-        else {
-            Set<Integer> set = new HashSet<>();
-
-            for (int j = 0; j < 10; j++)
-                set.add(i + j);
-
-            Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set);
-
-            assertTrue(es.isEmpty());
-        }
-    }
-
-    private void checkBinaryRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
-        IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
-
-        if (oneEntry) {
-            CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
-
-            assertNull(e);
-        }
-        else {
-            Set<Integer> set = new HashSet<>();
-
-            for (int j = 0; j < 10; j++)
-                set.add(i + j);
-
-            Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set);
-
-            assertTrue(es.isEmpty());
-        }
-    }
-
-    /** */
-    private void test(IgniteCache<Integer, TestValue> cache,
-        TransactionConcurrency txConcurrency,
-        TransactionIsolation txIsolation,
-        Transaction tx,
-        boolean oneEntry) {
-        if (tx == null) {
-            for (int i : createdBeforeTxKeys()) {
-                checkData(cache, i, oneEntry, null);
-            }
-
-            for (int i : createdBeforeTxWithBinaryKeys()) {
-                checkBinaryData(cache, i, oneEntry, null);
-            }
-
-            for (int i : removedBeforeTxKeys()) {
-                checkRemoved(cache, i, oneEntry);
-            }
-
-            for (int i : removedBeforeTxWithBinaryKeys()) {
-                checkBinaryRemoved(cache, i, oneEntry);
-            }
-        }
-        else {
-            GridCacheVersion txVer = ((TransactionProxyImpl)tx).tx().xidVersion();
-
-            for (int i : createdBeforeTxKeys2()) {
-                checkData(cache, i, oneEntry, null);
-                checkData(cache, i, oneEntry, null);
-            }
-
-            for (int i : createdBeforeTxWithBinaryKeys2()) {
-                checkBinaryData(cache, i, oneEntry, null);
-                checkBinaryData(cache, i, oneEntry, null);
-            }
-
-            try {
-                for (int i : createdBeforeTxKeys3()) {
-                    cache.get(i);
-
-                    checkData(cache, i, oneEntry, null);
-                }
-
-                for (int i : createdBeforeTxWithBinaryKeys3()) {
-                    cache.get(i);
-
-                    checkBinaryData(cache, i, oneEntry, null);
-                }
-
-                assertFalse(txIsolation == TransactionIsolation.REPEATABLE_READ &&
-                    txConcurrency == TransactionConcurrency.OPTIMISTIC);
-            }
-            catch (Exception e) {
-                assertTrue(txIsolation == TransactionIsolation.REPEATABLE_READ &&
-                    txConcurrency == TransactionConcurrency.OPTIMISTIC);
-            }
-
-            for (int i : createdAtTxKeys()) {
-                checkData(cache, i, oneEntry, txVer);
-            }
-
-            for (int i : createdAtTxWithBinaryKeys()) {
-                checkBinaryData(cache, i, oneEntry, txVer);
-            }
-
-            for (int i : removedBeforeTxKeys()) {
-                checkRemoved(cache, i, oneEntry);
-            }
-
-            for (int i : removedBeforeTxWithBinaryKeys()) {
-                checkBinaryRemoved(cache, i, oneEntry);
-            }
-            for (int i : removedAtTxKeys()) {
-                checkRemoved(cache, i, oneEntry);
-            }
-
-            for (int i : removedAtTxWithBinaryKeys()) {
-                checkBinaryRemoved(cache, i, oneEntry);
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestValue implements Serializable {
-        /** */
-        private int val;
-
-        /**
-         * @param val Value.
-         */
-        public TestValue(int val) {
-            this.val = val;
-        }
-
-        /**
-         * @return Value.
-         */
-        public int value() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(TestValue.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
new file mode 100644
index 0000000..c0ba42c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test getEntry and getEntries methods.
+ */
+public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTest {
+    /** */
+    private static final String UPDATED_ENTRY_ERR = "Impossible to get version for entry updated in transaction";
+
+    /** */
+    private static final String ENTRY_AFTER_GET_ERR = "Impossible to get entry version after get()";
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * @return Transaction concurrency.
+     */
+    abstract protected TransactionConcurrency concurrency();
+
+    /**
+     *
+     * @return Transaction isolation.
+     */
+    abstract protected TransactionIsolation isolation();
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(null);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNear() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(ATOMIC);
+        cfg.setName("near");
+        cfg.setNearConfiguration(new NearCacheConfiguration());
+
+        test(cfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNearTransactional() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setName("nearT");
+        cfg.setNearConfiguration(new NearCacheConfiguration());
+
+        test(cfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitioned() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(ATOMIC);
+        cfg.setName("partitioned");
+
+        test(cfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedTransactional() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setName("partitionedT");
+
+        test(cfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setCacheMode(LOCAL);
+        cfg.setAtomicityMode(ATOMIC);
+        cfg.setName("local");
+
+        test(cfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalTransactional() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setCacheMode(LOCAL);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setName("localT");
+
+        test(cfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicated() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setCacheMode(REPLICATED);
+        cfg.setAtomicityMode(ATOMIC);
+        cfg.setName("replicated");
+
+        test(cfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedTransactional() throws Exception {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+        cfg.setCacheMode(REPLICATED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setName("replicatedT");
+
+        test(cfg);
+    }
+
+    /**
+     * @param cfg Cache configuration.
+     * @throws Exception If failed.
+     */
+    private void test(CacheConfiguration cfg) throws Exception {
+        test(cfg, true);
+
+        test(cfg, false);
+    }
+
+    /**
+     * @param cfg Cache configuration.
+     * @param oneEntry If {@code true} then single entry is tested.
+     * @throws Exception If failed.
+     */
+    private void test(CacheConfiguration cfg, final boolean oneEntry) throws Exception {
+        final IgniteCache<Integer, TestValue> cache = grid(0).createCache(cfg);
+
+        try {
+            init(cache);
+
+            test(cache, null, null, null, oneEntry);
+
+            if (cfg.getAtomicityMode() == TRANSACTIONAL) {
+                TransactionConcurrency txConcurrency = concurrency();
+                TransactionIsolation txIsolation = isolation();
+
+                try (Transaction tx = grid(0).transactions().txStart(txConcurrency, txIsolation)) {
+                    initTx(cache);
+
+                    test(cache, txConcurrency, txIsolation, tx, oneEntry);
+
+                    tx.commit();
+                }
+
+                testConcurrentTx(cache, OPTIMISTIC, REPEATABLE_READ, oneEntry);
+                testConcurrentTx(cache, OPTIMISTIC, READ_COMMITTED, oneEntry);
+
+                testConcurrentTx(cache, PESSIMISTIC, REPEATABLE_READ, oneEntry);
+                testConcurrentTx(cache, PESSIMISTIC, READ_COMMITTED, oneEntry);
+            }
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param txConcurrency Transaction concurrency.
+     * @param txIsolation Transaction isolation.
+     * @param oneEntry If {@code true} then single entry is tested.
+     * @throws Exception If failed.
+     */
+    private void testConcurrentTx(final IgniteCache<Integer, TestValue> cache,
+        final TransactionConcurrency txConcurrency,
+        final TransactionIsolation txIsolation,
+        final boolean oneEntry) throws Exception {
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                IgniteTransactions txs = grid(0).transactions();
+
+                long stopTime = System.currentTimeMillis() + 3000;
+
+                while (System.currentTimeMillis() < stopTime) {
+                    Set<Integer> keys = new LinkedHashSet<>();
+
+                    for (int i = 0; i < 100; i++)
+                        keys.add(i);
+
+                    try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
+                        if (oneEntry) {
+                            for (int i = 0; i < 100; i++)
+                                cache.getEntry(i);
+                        }
+                        else
+                            cache.getEntries(keys);
+
+                        for (int i = 0; i < 100; i++)
+                            cache.put(i, new TestValue(i));
+
+                        tx.commit();
+                    }
+                }
+
+                return null;
+            }
+        }, 10, "tx-thread");
+    }
+
+    /**
+     * @param base Start value.
+     * @return Keys.
+     */
+    private Set<Integer> getKeys(int base) {
+        int start = 0;
+        int finish = 100;
+
+        Set<Integer> keys = new HashSet<>(finish - start);
+
+        for (int i = base + start; i < base + finish; ++i)
+            keys.add(i);
+
+        return keys;
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> createdBeforeTxKeys() {
+        return getKeys(0);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> createdBeforeTxWithBinaryKeys() {
+        return getKeys(1_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> createdBeforeTxKeys2() {
+        return getKeys(2_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> createdBeforeTxWithBinaryKeys2() {
+        return getKeys(3_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> createdBeforeTxKeys3() {
+        return getKeys(4_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> createdBeforeTxWithBinaryKeys3() {
+        return getKeys(5_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> removedBeforeTxKeys() {
+        return getKeys(6_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> removedBeforeTxWithBinaryKeys() {
+        return getKeys(7_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> createdAtTxKeys() {
+        return getKeys(8_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> createdAtTxWithBinaryKeys() {
+        return getKeys(9_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> removedAtTxKeys() {
+        return getKeys(10_000);
+    }
+
+    /**
+     * @return Keys.
+     */
+    private Set<Integer> removedAtTxWithBinaryKeys() {
+        return getKeys(11_000);
+    }
+
+    /**
+     * @param cache Cacge.
+     */
+    private void init(IgniteCache<Integer, TestValue> cache) {
+        Set<Integer> keys = new HashSet<>();
+
+        keys.addAll(createdBeforeTxKeys());
+        keys.addAll(createdBeforeTxWithBinaryKeys());
+        keys.addAll(createdBeforeTxKeys2());
+        keys.addAll(createdBeforeTxWithBinaryKeys2());
+        keys.addAll(createdBeforeTxKeys3());
+        keys.addAll(createdBeforeTxWithBinaryKeys3());
+        keys.addAll(removedBeforeTxKeys());
+        keys.addAll(removedBeforeTxWithBinaryKeys());
+        keys.addAll(removedAtTxKeys());
+        keys.addAll(removedAtTxWithBinaryKeys());
+
+        for (int i : keys)
+            cache.put(i, new TestValue(i));
+
+        for (int i : removedBeforeTxKeys())
+            cache.remove(i);
+
+        for (int i : removedBeforeTxWithBinaryKeys())
+            cache.remove(i);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void initTx(IgniteCache<Integer, TestValue> cache) {
+        for (int i : createdAtTxKeys())
+            cache.put(i, new TestValue(i));
+
+        for (int i : createdAtTxWithBinaryKeys())
+            cache.put(i, new TestValue(i));
+
+        for (int i : removedAtTxKeys())
+            cache.remove(i);
+
+        for (int i : removedAtTxWithBinaryKeys())
+            cache.remove(i);
+    }
+
+    /**
+     * @param e Entry.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void compareVersionWithPrimaryNode(CacheEntry<Integer, ?> e, IgniteCache<Integer, TestValue> cache)
+        throws Exception {
+        CacheConfiguration cfg = cache.getConfiguration(CacheConfiguration.class);
+
+        if (cfg.getCacheMode() != LOCAL) {
+            Ignite prim = primaryNode(e.getKey(), cache.getName());
+
+            GridCacheAdapter<Object, Object> cacheAdapter = ((IgniteKernal)prim).internalCache(cache.getName());
+
+            if (cfg.getNearConfiguration() != null)
+                cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht();
+
+            IgniteCacheObjectProcessor cacheObjects = cacheAdapter.context().cacheObjects();
+
+            CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext();
+
+            GridCacheMapEntry mapEntry = cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(
+                cacheObjCtx, e.getKey(), true));
+
+            assertNotNull("No entry for key: " + e.getKey(), mapEntry);
+            assertEquals(mapEntry.version(), e.version());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param i Key.
+     * @param oneEntry If {@code true} then single entry is tested.
+     * @param getVerErr Not null error if entry version access should fail.
+     * @param expKeys Expected keys with values.
+     * @throws Exception If failed.
+     */
+    private void checkData(IgniteCache<Integer, TestValue> cache,
+        int i,
+        boolean oneEntry,
+        @Nullable String getVerErr,
+        Set<Integer> expKeys) throws Exception {
+        if (oneEntry) {
+            final CacheEntry<Integer, TestValue> e = cache.getEntry(i);
+
+            if (getVerErr == null)
+                compareVersionWithPrimaryNode(e, cache);
+            else {
+                Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        e.version();
+
+                        return null;
+                    }
+                }, IgniteException.class, null);
+
+                assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+            }
+
+            assertEquals(e.getValue().val, i);
+        }
+        else {
+            Set<Integer> set = new HashSet<>();
+
+            int expCnt = 0;
+
+            for (int j = 0; j < 10; j++) {
+                Integer key = i + j;
+
+                set.add(key);
+
+                if (expKeys.contains(key))
+                    expCnt++;
+            }
+
+            Collection<CacheEntry<Integer, TestValue>> entries = cache.getEntries(set);
+
+            assertEquals(expCnt, entries.size());
+
+            for (final CacheEntry<Integer, TestValue> e : entries) {
+                if (getVerErr == null)
+                    compareVersionWithPrimaryNode(e, cache);
+                else {
+                    Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            e.version();
+
+                            return null;
+                        }
+                    }, IgniteException.class, null);
+
+                    assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+                }
+
+                assertEquals((Integer)e.getValue().val, e.getKey());
+
+                assertTrue(set.contains(e.getValue().val));
+            }
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param i Key.
+     * @param oneEntry If {@code true} then single entry is tested.
+     * @param getVerErr Not null error if entry version access should fail.
+     * @param expKeys Expected keys with values.
+     * @throws Exception If failed.
+     */
+    private void checkBinaryData(IgniteCache<Integer, TestValue> cache,
+        int i,
+        boolean oneEntry,
+        @Nullable String getVerErr,
+        Set<Integer> expKeys) throws Exception {
+        IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
+
+        if (oneEntry) {
+            final CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
+
+            if (getVerErr == null)
+                compareVersionWithPrimaryNode(e, cache);
+            else {
+                Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        e.version();
+
+                        return null;
+                    }
+                }, IgniteException.class, null);
+
+                assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+            }
+
+            assertEquals(((TestValue)e.getValue().deserialize()).val, i);
+        }
+        else {
+            Set<Integer> set = new HashSet<>();
+
+            int expCnt = 0;
+
+            for (int j = 0; j < 10; j++) {
+                Integer key = i + j;
+
+                set.add(key);
+
+                if (expKeys.contains(key))
+                    expCnt++;
+            }
+
+            Collection<CacheEntry<Integer, BinaryObject>> entries = cacheB.getEntries(set);
+
+            assertEquals(expCnt, entries.size());
+
+            for (final CacheEntry<Integer, BinaryObject> e : entries) {
+                if (getVerErr == null)
+                    compareVersionWithPrimaryNode(e, cache);
+                else {
+                    Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            e.version();
+
+                            return null;
+                        }
+                    }, IgniteException.class, null);
+
+                    assertTrue("Unexpected error message: " + err.getMessage(), err.getMessage().startsWith(getVerErr));
+                }
+
+                TestValue tv = e.getValue().deserialize();
+
+                assertEquals((Integer)tv.val, e.getKey());
+
+                assertTrue(set.contains((tv).val));
+            }
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param i Key.
+     * @param oneEntry If {@code true} then single entry is tested.
+     */
+    private void checkRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
+        if (oneEntry) {
+            CacheEntry<Integer, TestValue> e = cache.getEntry(i);
+
+            assertNull(e);
+        }
+        else {
+            Set<Integer> set = new HashSet<>();
+
+            for (int j = 0; j < 10; j++)
+                set.add(i + j);
+
+            Collection<CacheEntry<Integer, TestValue>> es = cache.getEntries(set);
+
+            assertTrue(es.isEmpty());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param i Key.
+     * @param oneEntry If {@code true} then single entry is tested.
+     */
+    private void checkBinaryRemoved(IgniteCache<Integer, TestValue> cache, int i, boolean oneEntry) {
+        IgniteCache<Integer, BinaryObject> cacheB = cache.withKeepBinary();
+
+        if (oneEntry) {
+            CacheEntry<Integer, BinaryObject> e = cacheB.getEntry(i);
+
+            assertNull(e);
+        }
+        else {
+            Set<Integer> set = new HashSet<>();
+
+            for (int j = 0; j < 10; j++)
+                set.add(i + j);
+
+            Collection<CacheEntry<Integer, BinaryObject>> es = cacheB.getEntries(set);
+
+            assertTrue(es.isEmpty());
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param txConcurrency Transaction concurrency.
+     * @param txIsolation Transaction isolation.
+     * @param tx Transaction.
+     * @param oneEntry If {@code true} then single entry is tested.
+     * @throws Exception If failed.
+     */
+    private void test(IgniteCache<Integer, TestValue> cache,
+        TransactionConcurrency txConcurrency,
+        TransactionIsolation txIsolation,
+        Transaction tx,
+        boolean oneEntry) throws Exception {
+        if (tx == null) {
+            Set<Integer> keys = createdBeforeTxKeys();
+
+            for (int i : keys)
+                checkData(cache, i, oneEntry, null, keys);
+
+            keys = createdBeforeTxWithBinaryKeys();
+
+            for (int i : keys)
+                checkBinaryData(cache, i, oneEntry, null, keys);
+
+            for (int i : removedBeforeTxKeys())
+                checkRemoved(cache, i, oneEntry);
+
+            for (int i : removedBeforeTxWithBinaryKeys())
+                checkBinaryRemoved(cache, i, oneEntry);
+        }
+        else {
+            Set<Integer> keys = createdBeforeTxKeys2();
+
+            for (int i : keys) {
+                checkData(cache, i, oneEntry, null, keys);
+                checkData(cache, i, oneEntry, null, keys);
+            }
+
+            keys = createdBeforeTxWithBinaryKeys2();
+
+            for (int i : keys) {
+                checkBinaryData(cache, i, oneEntry, null, keys);
+                checkBinaryData(cache, i, oneEntry, null, keys);
+            }
+
+            String verGetErr = null;
+
+            if (txConcurrency == OPTIMISTIC && txIsolation == REPEATABLE_READ)
+                verGetErr = ENTRY_AFTER_GET_ERR;
+
+            keys = createdBeforeTxKeys3();
+
+            for (int i : keys) {
+                if (oneEntry)
+                    cache.get(i);
+                else {
+                    Set<Integer> set = new HashSet<>();
+
+                    for (int j = 0; j < 10; j++)
+                        set.add(i + j);
+
+                    cache.getAll(set);
+                }
+
+                checkData(cache, i, oneEntry, verGetErr, keys);
+            }
+
+            keys = createdBeforeTxWithBinaryKeys3();
+
+            for (int i : keys) {
+                if (oneEntry)
+                    cache.get(i);
+                else {
+                    Set<Integer> set = new HashSet<>();
+
+                    for (int j = 0; j < 10; j++)
+                        set.add(i + j);
+
+                    cache.getAll(set);
+                }
+
+                checkBinaryData(cache, i, oneEntry, verGetErr, keys);
+            }
+
+            keys = createdAtTxKeys();
+
+            for (int i : keys)
+                checkData(cache, i, oneEntry, UPDATED_ENTRY_ERR, keys);
+
+            keys = createdAtTxWithBinaryKeys();
+
+            for (int i : keys)
+                checkBinaryData(cache, i, oneEntry, UPDATED_ENTRY_ERR, keys);
+
+            for (int i : removedBeforeTxKeys())
+                checkRemoved(cache, i, oneEntry);
+
+            for (int i : removedBeforeTxWithBinaryKeys())
+                checkBinaryRemoved(cache, i, oneEntry);
+
+            for (int i : removedAtTxKeys())
+                checkRemoved(cache, i, oneEntry);
+
+            for (int i : removedAtTxWithBinaryKeys())
+                checkBinaryRemoved(cache, i, oneEntry);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestValue implements Serializable {
+        /** */
+        private int val;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(int val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public int value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
index b7cd28b..acc21df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticReadCommittedSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
 /**
  * Test getEntry and getEntries methods.
  */
-public class CacheGetEntryOptimisticReadCommittedSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryOptimisticReadCommittedSeltTest extends CacheGetEntryAbstractTest {
+    /** {@inheritDoc} */
     @Override protected TransactionConcurrency concurrency() {
         return TransactionConcurrency.OPTIMISTIC;
     }
 
+    /** {@inheritDoc} */
     @Override protected TransactionIsolation isolation() {
         return TransactionIsolation.READ_COMMITTED;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
index d8453ba..6153869 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticRepeatableReadSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
 /**
  * Test getEntry and getEntries methods.
  */
-public class CacheGetEntryOptimisticRepeatableReadSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryOptimisticRepeatableReadSeltTest extends CacheGetEntryAbstractTest {
+    /** {@inheritDoc} */
     @Override protected TransactionConcurrency concurrency() {
         return TransactionConcurrency.OPTIMISTIC;
     }
 
+    /** {@inheritDoc} */
     @Override protected TransactionIsolation isolation() {
         return TransactionIsolation.REPEATABLE_READ;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
index 4fbe66e..6ded4a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryOptimisticSerializableSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
 /**
  * Test getEntry and getEntries methods.
  */
-public class CacheGetEntryOptimisticSerializableSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryOptimisticSerializableSeltTest extends CacheGetEntryAbstractTest {
+    /** {@inheritDoc} */
     @Override protected TransactionConcurrency concurrency() {
         return TransactionConcurrency.OPTIMISTIC;
     }
 
+    /** {@inheritDoc} */
     @Override protected TransactionIsolation isolation() {
         return TransactionIsolation.SERIALIZABLE;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
index 86d6e4e..975d271 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticReadCommittedSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
 /**
  * Test getEntry and getEntries methods.
  */
-public class CacheGetEntryPessimisticReadCommittedSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryPessimisticReadCommittedSeltTest extends CacheGetEntryAbstractTest {
+    /** {@inheritDoc} */
     @Override protected TransactionConcurrency concurrency() {
         return TransactionConcurrency.PESSIMISTIC;
     }
 
+    /** {@inheritDoc} */
     @Override protected TransactionIsolation isolation() {
         return TransactionIsolation.READ_COMMITTED;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
index 7fc7aa3..dac64d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticRepeatableReadSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
 /**
  * Test getEntry and getEntries methods.
  */
-public class CacheGetEntryPessimisticRepeatableReadSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryPessimisticRepeatableReadSeltTest extends CacheGetEntryAbstractTest {
+    /** {@inheritDoc} */
     @Override protected TransactionConcurrency concurrency() {
         return TransactionConcurrency.PESSIMISTIC;
     }
 
+    /** {@inheritDoc} */
     @Override protected TransactionIsolation isolation() {
         return TransactionIsolation.REPEATABLE_READ;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21567fab/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
index da28eca..70f71ce 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryPessimisticSerializableSeltTest.java
@@ -23,12 +23,13 @@ import org.apache.ignite.transactions.TransactionIsolation;
 /**
  * Test getEntry and getEntries methods.
  */
-public class CacheGetEntryPessimisticSerializableSeltTest extends CacheGetEntryAbstractSeltTest {
-
+public class CacheGetEntryPessimisticSerializableSeltTest extends CacheGetEntryAbstractTest {
+    /** {@inheritDoc} */
     @Override protected TransactionConcurrency concurrency() {
         return TransactionConcurrency.PESSIMISTIC;
     }
 
+    /** {@inheritDoc} */
     @Override protected TransactionIsolation isolation() {
         return TransactionIsolation.SERIALIZABLE;
     }


[03/12] ignite git commit: gc

Posted by sb...@apache.org.
gc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/861236a1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/861236a1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/861236a1

Branch: refs/heads/ignite-2224
Commit: 861236a16851317d3c580eb5a72ef8e5f8561b80
Parents: 8a8a8c1
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Jan 29 18:43:10 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Jan 29 18:43:10 2016 +0300

----------------------------------------------------------------------
 .../rebalancing/GridCacheRebalancingSyncSelfTest.java        | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/861236a1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index 4ee080f..e4ad66b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -177,6 +178,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        GridTestUtils.runGC(); // Clean heap before rebalancing.
+    }
+
     /**
      * @throws Exception If failed.
      */


[02/12] ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-main

Posted by sb...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-main


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8a8a8c17
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8a8a8c17
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8a8a8c17

Branch: refs/heads/ignite-2224
Commit: 8a8a8c17602505adb4234bdd440b2ae20ee9a25e
Parents: c2e5638 d2a107b
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Jan 29 15:52:38 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Jan 29 15:52:38 2016 +0300

----------------------------------------------------------------------
 .../examples/datagrid/CacheAffinityExample.java |   8 +-
 .../java8/datagrid/CacheAffinityExample.java    |   6 +-
 modules/benchmarks/pom.xml                      |  32 +-
 .../benchmarks/jmh/JmhAbstractBenchmark.java    | 150 ++++++++++
 .../jmh/cache/JmhCacheAbstractBenchmark.java    | 181 ++++++++++++
 .../jmh/cache/JmhCachePutBenchmark.java         | 124 ++++++++
 .../benchmarks/jmh/cache/PutBenchmark.java      | 170 -----------
 .../jmh/runner/JmhIdeBenchmarkRunner.java       | 232 +++++++++++++++
 .../internal/client/ClientGetAffinityTask.java  |   4 +-
 .../java/org/apache/ignite/IgniteCluster.java   |   7 +-
 .../apache/ignite/cache/affinity/Affinity.java  |  24 +-
 .../affinity/GridAffinityProcessor.java         |  60 ++--
 .../cache/GridCacheAffinityManager.java         |  47 ++-
 .../cache/affinity/GridCacheAffinityImpl.java   |  48 ++-
 .../dht/preloader/GridDhtPartitionDemander.java |   4 +-
 .../near/GridNearTxFinishFuture.java            |  96 +++---
 .../ignite/internal/GridAffinityMappedTest.java |   8 +-
 .../internal/GridAffinityNoCacheSelfTest.java   | 290 +++++++++++++++++++
 .../internal/GridAffinityP2PSelfTest.java       |   8 +-
 .../ignite/internal/GridAffinitySelfTest.java   |   8 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   4 +-
 .../cache/GridCacheAffinityRoutingSelfTest.java |  10 +-
 .../GridCacheConcurrentTxMultiNodeTest.java     |   4 +-
 .../GridCacheDaemonNodeAbstractSelfTest.java    |  17 +-
 .../cache/GridCacheDeploymentSelfTest.java      |   8 +-
 .../cache/GridCacheEntryMemorySizeSelfTest.java |   6 +-
 ...hePartitionedProjectionAffinitySelfTest.java |   8 +-
 .../cache/GridCachePutAllFailoverSelfTest.java  |   4 +-
 .../dht/GridCacheDhtMultiBackupTest.java        |   4 +-
 .../near/GridCacheNearOnlyTopologySelfTest.java |   4 +-
 .../near/GridCacheNearTxMultiNodeSelfTest.java  |   4 +-
 ...titionedExplicitLockNodeFailureSelfTest.java |   6 +-
 ...ridCacheContinuousQueryAbstractSelfTest.java |  10 +-
 .../processors/igfs/IgfsStreamsSelfTest.java    |   4 +-
 .../GridServicePackagePrivateSelfTest.java      |  17 +-
 .../ignite/loadtests/dsi/GridDsiClient.java     |   4 +-
 .../tcp/GridCacheDhtLockBackupSelfTest.java     |   4 +-
 .../testsuites/IgniteComputeGridTestSuite.java  |   4 +-
 modules/docker/1.5.0.final/Dockerfile           |  40 +++
 modules/docker/1.5.0.final/run.sh               |  50 ++++
 modules/docker/Dockerfile                       |   6 +-
 41 files changed, 1343 insertions(+), 382 deletions(-)
----------------------------------------------------------------------



[10/12] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-2224

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-2224


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fdafe285
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fdafe285
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fdafe285

Branch: refs/heads/ignite-2224
Commit: fdafe2858ce784ca7a6fcc2c63b2069431f01755
Parents: e7caaa0 4e61602
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 1 09:39:37 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 1 09:39:37 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |   4 +-
 .../processors/cache/CacheOperationContext.java |  43 ++-
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../processors/cache/GridCacheProxyImpl.java    |  11 +-
 .../processors/cache/IgniteCacheProxy.java      |  43 ++-
 .../dht/atomic/GridDhtAtomicCache.java          | 103 ++++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   4 +-
 .../processors/cache/dr/GridCacheDrInfo.java    |  49 ++-
 .../transactions/IgniteTxLocalAdapter.java      |  81 ++--
 .../cache/version/GridCacheVersionManager.java  |  23 +-
 .../GridCacheRebalancingSyncSelfTest.java       |   8 +
 .../testframework/junits/GridAbstractTest.java  |   3 +
 modules/kafka/README.txt                        | 111 +++++-
 modules/kafka/pom.xml                           |  69 ++--
 .../ignite/stream/kafka/KafkaStreamer.java      |   2 +-
 .../kafka/connect/IgniteSinkConnector.java      |  91 +++++
 .../kafka/connect/IgniteSinkConstants.java      |  38 ++
 .../stream/kafka/connect/IgniteSinkTask.java    | 165 ++++++++
 .../kafka/IgniteKafkaStreamerSelfTestSuite.java |   9 +-
 .../stream/kafka/KafkaEmbeddedBroker.java       | 387 -------------------
 .../kafka/KafkaIgniteStreamerSelfTest.java      |  13 +-
 .../ignite/stream/kafka/SimplePartitioner.java  |  53 ---
 .../ignite/stream/kafka/TestKafkaBroker.java    | 237 ++++++++++++
 .../kafka/connect/IgniteSinkConnectorTest.java  | 250 ++++++++++++
 .../kafka/src/test/resources/example-ignite.xml |  71 ++++
 .../commands/top/VisorTopologyCommand.scala     |   5 +-
 .../apache/ignite/yarn/ApplicationMaster.java   |  12 +-
 .../apache/ignite/yarn/ClusterProperties.java   | 144 ++++---
 .../yarn/IgniteApplicationMasterSelfTest.java   |  52 +++
 parent/pom.xml                                  | 120 +++---
 pom.xml                                         |  16 -
 31 files changed, 1516 insertions(+), 709 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/fdafe285/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------


[11/12] ignite git commit: Merge remote-tracking branch 'origin/ignite-2224' into ignite-2224

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-2224' into ignite-2224


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ede58b5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ede58b5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ede58b5

Branch: refs/heads/ignite-2224
Commit: 4ede58b5f12aaaeb5e135a530ddf108437c0bd0e
Parents: fdafe28 eccbd0d
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 1 11:51:48 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 1 11:51:48 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/cache/GridCacheAdapter.java     | 2 +-
 .../processors/cache/GridCacheInterceptorAbstractSelfTest.java | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4ede58b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------


[06/12] ignite git commit: Revert PR with wrong author.

Posted by sb...@apache.org.
Revert PR with wrong author.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/304370cf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/304370cf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/304370cf

Branch: refs/heads/ignite-2224
Commit: 304370cf7500f259e7a3a4699dab9f08a7609200
Parents: e5fda53
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Feb 1 10:43:45 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Feb 1 10:43:45 2016 +0700

----------------------------------------------------------------------
 .../apache/ignite/visor/commands/top/VisorTopologyCommand.scala | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/304370cf/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
index d2ec662..5e278ed 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
@@ -263,13 +263,12 @@ class VisorTopologyCommand extends VisorConsoleCommand {
 
         val hostsT = VisorTextTable()
 
-        hostsT #= ("Int./Ext. IPs", "Node ID8(@)","Node Type", "OS", "CPUs", "MACs", "CPU Load")
+        hostsT #= ("Int./Ext. IPs", "Node ID8(@)", "OS", "CPUs", "MACs", "CPU Load")
 
         neighborhood.foreach {
             case (_, neighbors) =>
                 var ips = Set.empty[String]
                 var id8s = List.empty[String]
-                var nodeTypes = List.empty[String]
                 var macs = Set.empty[String]
                 var cpuLoadSum = 0.0
 
@@ -288,7 +287,6 @@ class VisorTopologyCommand extends VisorConsoleCommand {
                 neighbors.foreach(n => {
                     id8s = id8s :+ (i.toString + ": " + nodeId8(n.id))
 
-                    nodeTypes = nodeTypes :+ (if (n.isClient) "Client" else "Server")
                     i += 1
 
                     ips = ips ++ n.addresses()
@@ -302,7 +300,6 @@ class VisorTopologyCommand extends VisorConsoleCommand {
                 hostsT += (
                     ips.toSeq,
                     id8s,
-                    nodeTypes,
                     os,
                     cpus,
                     macs.toSeq,


[08/12] ignite git commit: Fixed IGNITE-2419 Ignite on YARN do not handle memory overhead. This closes #414.

Posted by sb...@apache.org.
Fixed IGNITE-2419 Ignite on YARN do not handle memory overhead. This closes #414.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1945b988
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1945b988
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1945b988

Branch: refs/heads/ignite-2224
Commit: 1945b98849cb52fc297c726fa79a270706135581
Parents: 5969129
Author: Edouard Chevalier <ed...@techmydata.net>
Authored: Mon Feb 1 07:34:59 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Mon Feb 1 07:34:59 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/yarn/ApplicationMaster.java   |  12 +-
 .../apache/ignite/yarn/ClusterProperties.java   | 144 +++++++++++--------
 .../yarn/IgniteApplicationMasterSelfTest.java   |  52 +++++++
 3 files changed, 140 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1945b988/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
index 755e4e4..b9ab02d 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java
@@ -137,8 +137,8 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
                             + "cp -r ./libs/* ./ignite/*/libs/ || true && "
                             + "./ignite/*/bin/ignite.sh "
                             + "./ignite-config.xml"
-                            + " -J-Xmx" + c.getResource().getMemory() + "m"
-                            + " -J-Xms" + c.getResource().getMemory() + "m"
+                            + " -J-Xmx" + ((int)props.memoryPerNode()) + "m"
+                            + " -J-Xms" + ((int)props.memoryPerNode()) + "m"
                             + IgniteYarnUtils.YARN_LOG_OUT
                         ));
 
@@ -178,7 +178,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
 
         // Check that slave satisfies min requirements.
         if (cont.getResource().getVirtualCores() < props.cpusPerNode()
-            || cont.getResource().getMemory() < props.memoryPerNode()) {
+            || cont.getResource().getMemory() < props.totalMemoryPerNode()) {
             log.log(Level.FINE, "Container resources not sufficient requirements. Host: {0}, cpu: {1}, mem: {2}",
                 new Object[]{cont.getNodeId().getHost(), cont.getResource().getVirtualCores(),
                    cont.getResource().getMemory()});
@@ -291,7 +291,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
                     // Resource requirements for worker containers.
                     Resource capability = Records.newRecord(Resource.class);
 
-                    capability.setMemory((int)props.memoryPerNode());
+                    capability.setMemory((int)props.totalMemoryPerNode());
                     capability.setVirtualCores((int)props.cpusPerNode());
 
                     for (int i = 0; i < props.instances() - runningCnt; ++i) {
@@ -302,7 +302,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
                         rmClient.addContainerRequest(containerAsk);
 
                         log.log(Level.INFO, "Making request. Memory: {0}, cpu {1}.",
-                            new Object[]{props.memoryPerNode(), props.cpusPerNode()});
+                            new Object[]{props.totalMemoryPerNode(), props.cpusPerNode()});
                     }
                 }
 
@@ -329,7 +329,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler {
     private boolean checkAvailableResource() {
         Resource availableRes = rmClient.getAvailableResources();
 
-        return availableRes == null || availableRes.getMemory() >= props.memoryPerNode()
+        return availableRes == null || availableRes.getMemory() >= props.totalMemoryPerNode()
             && availableRes.getVirtualCores() >= props.cpusPerNode();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1945b988/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
index d040e9f..647aef2 100644
--- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
+++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java
@@ -48,6 +48,12 @@ public class ClusterProperties {
 
     /** */
     public static final double DEFAULT_MEM_PER_NODE = 2048;
+    
+    /**
+     * The minimum memory overhead: overhead is by default 0.1* MEMORY_PER_NODE,
+     * with a minimum of DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE.
+     */
+    public static final double DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE = 384;
 
     /** Cluster name. */
     private String clusterName = DEFAULT_CLUSTER_NAME;
@@ -63,6 +69,12 @@ public class ClusterProperties {
 
     /** Memory limit. */
     private double memPerNode = DEFAULT_MEM_PER_NODE;
+    
+    /** */
+    public static final String IGNITE_MEMORY_OVERHEAD_PER_NODE = "IGNITE_MEMORY_OVERHEAD_PER_NODE";
+
+    /** Memory over head to request yarn. */
+    private double memOverHeadPerNode = 0;
 
     /** */
     public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT";
@@ -185,7 +197,30 @@ public class ClusterProperties {
     }
 
     /**
-     * @return instance count limit.
+     * @return Memory overhead for requested memory.
+     */
+    public double memoryOverHeadPerNode() {
+		return memOverHeadPerNode;
+	}
+
+    /**
+     * Sets memory overhead requested to YARN.
+     *
+     * @param memOverHeadPerNode Memory over head per node.
+     */
+	public void memoryOverHeadPerNode(double memOverHeadPerNode) {
+		this.memOverHeadPerNode = memOverHeadPerNode;
+	}
+	
+	/**
+	 * @return Provide the total memory requested to ResourceManagers (memoryPerNode + memoryOverheadPerNode).
+	 */
+	public double totalMemoryPerNode(){
+		return memoryPerNode() + memoryOverHeadPerNode();
+	}
+
+	/**
+     * @return Instance count limit.
      */
     public double instances() {
         return nodeCnt;
@@ -278,7 +313,50 @@ public class ClusterProperties {
     }
 
     /**
-     * @param config path to config file.
+     * Instantiate a ClusterProperties from a set of properties.
+     *
+     * @param props If {@code null} will be used system properties.
+     * @return Cluster properties.
+     */
+    private static ClusterProperties fromProperties(Properties props) {
+    	ClusterProperties prop = new ClusterProperties();
+
+        prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
+
+        prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, DEFAULT_CPU_PER_NODE);
+        prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, DEFAULT_MEM_PER_NODE);
+        // The minimum memory overhead: overhead is by default 0.1* MEMORY_PER_NODE,
+        // with a minimum of DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE
+        prop.memOverHeadPerNode = getDoubleProperty(IGNITE_MEMORY_OVERHEAD_PER_NODE, props,
+            Math.max( 0.1 * prop.memPerNode, DEFAULT_MINIMUM_MEM_OVERHEAD_PER_NODE));
+        prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, DEFAULT_IGNITE_NODE_COUNT);
+
+        prop.igniteUrl = getStringProperty(IGNITE_URL, props, null);
+        prop.ignitePath = getStringProperty(IGNITE_PATH, props, null);
+        prop.licencePath = getStringProperty(LICENCE_PATH, props, null);
+        prop.jvmOpts = getStringProperty(IGNITE_JVM_OPTS, props, null);
+        prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
+        prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, props, DEFAULT_IGNITE_LOCAL_WORK_DIR);
+        prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR);
+        prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
+        prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
+
+        String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, props, null);
+
+        if (pattern != null) {
+            try {
+                prop.hostnameConstraint = Pattern.compile(pattern);
+            }
+            catch (PatternSyntaxException e) {
+                log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
+            }
+        }
+
+        return prop;
+    }
+    
+    /**
+     * @param config Path to config file.
      * @return Cluster configuration.
      */
     public static ClusterProperties from(String config) {
@@ -291,36 +369,7 @@ public class ClusterProperties {
                 props.load(new FileInputStream(config));
             }
 
-            ClusterProperties prop = new ClusterProperties();
-
-            prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME);
-
-            prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, DEFAULT_CPU_PER_NODE);
-            prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, DEFAULT_MEM_PER_NODE);
-            prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, DEFAULT_IGNITE_NODE_COUNT);
-
-            prop.igniteUrl = getStringProperty(IGNITE_URL, props, null);
-            prop.ignitePath = getStringProperty(IGNITE_PATH, props, null);
-            prop.licencePath = getStringProperty(LICENCE_PATH, props, null);
-            prop.jvmOpts = getStringProperty(IGNITE_JVM_OPTS, props, null);
-            prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR);
-            prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, props, DEFAULT_IGNITE_LOCAL_WORK_DIR);
-            prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR);
-            prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null);
-            prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null);
-
-            String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, props, null);
-
-            if (pattern != null) {
-                try {
-                    prop.hostnameConstraint = Pattern.compile(pattern);
-                }
-                catch (PatternSyntaxException e) {
-                    log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
-                }
-            }
-
-            return prop;
+            return fromProperties(props);
         }
         catch (IOException e) {
             throw new RuntimeException(e);
@@ -331,36 +380,7 @@ public class ClusterProperties {
      * @return Cluster configuration.
      */
     public static ClusterProperties from() {
-        ClusterProperties prop = new ClusterProperties();
-
-        prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME);
-
-        prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, DEFAULT_CPU_PER_NODE);
-        prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, DEFAULT_MEM_PER_NODE);
-        prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, DEFAULT_IGNITE_NODE_COUNT);
-
-        prop.igniteUrl = getStringProperty(IGNITE_URL, null, null);
-        prop.ignitePath = getStringProperty(IGNITE_PATH, null, null);
-        prop.licencePath = getStringProperty(LICENCE_PATH, null, null);
-        prop.jvmOpts = getStringProperty(IGNITE_JVM_OPTS, null, null);
-        prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR);
-        prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, null, DEFAULT_IGNITE_LOCAL_WORK_DIR);
-        prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, null, DEFAULT_IGNITE_RELEASES_DIR);
-        prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null);
-        prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null);
-
-        String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, null, null);
-
-        if (pattern != null) {
-            try {
-                prop.hostnameConstraint = Pattern.compile(pattern);
-            }
-            catch (PatternSyntaxException e) {
-                log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e);
-            }
-        }
-
-        return prop;
+        return fromProperties(null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1945b988/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
index 97f6a12..1190313 100644
--- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
+++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java
@@ -103,6 +103,58 @@ public class IgniteApplicationMasterSelfTest extends TestCase {
             assertEquals(1024, req.getCapability().getMemory());
         }
     }
+    
+    /**
+     * Tests whether memory overhead is allocated within container memory.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMemoryOverHeadAllocation() throws Exception {
+        appMaster.setRmClient(rmMock);
+        appMaster.setNmClient(new NMMock());
+
+        props.cpusPerNode(2);
+        props.memoryPerNode(1024);
+        props.memoryOverHeadPerNode(512);
+        props.instances(3);
+
+        Thread thread = runAppMaster(appMaster);
+
+        List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
+
+        interruptedThread(thread);
+
+        assertEquals(3, contRequests.size());
+
+        for (AMRMClient.ContainerRequest req : contRequests) {
+            assertEquals(2, req.getCapability().getVirtualCores());
+            assertEquals(1024 + 512, req.getCapability().getMemory());
+        }
+    }
+
+    /**
+     * Tests whether memory overhead prevents from allocating container.
+     *
+     * @throws Exception If failed.
+     */
+     public void testMemoryOverHeadPreventAllocation() throws Exception {
+        rmMock.availableRes(new MockResource(1024, 2));
+        appMaster.setRmClient(rmMock);
+        appMaster.setNmClient(new NMMock());
+
+        props.cpusPerNode(2);
+        props.memoryPerNode(1024);
+        props.memoryOverHeadPerNode(512);
+        props.instances(3);
+
+        Thread thread = runAppMaster(appMaster);
+
+        List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000);
+
+        interruptedThread(thread);
+
+        assertEquals(0, contRequests.size());
+     }
 
     /**
      * @throws Exception If failed.


[05/12] ignite git commit: IGNITE-1069 Added output of node type (server or client) in Visor commandline top command.

Posted by sb...@apache.org.
IGNITE-1069 Added output of node type (server or client) in Visor commandline top command.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e5fda53a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e5fda53a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e5fda53a

Branch: refs/heads/ignite-2224
Commit: e5fda53ae5f0e64e63f97db9d664081d24b34fa7
Parents: c92c274
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Feb 1 10:08:21 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Feb 1 10:08:21 2016 +0700

----------------------------------------------------------------------
 .../apache/ignite/visor/commands/top/VisorTopologyCommand.scala | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e5fda53a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
index 5e278ed..d2ec662 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
@@ -263,12 +263,13 @@ class VisorTopologyCommand extends VisorConsoleCommand {
 
         val hostsT = VisorTextTable()
 
-        hostsT #= ("Int./Ext. IPs", "Node ID8(@)", "OS", "CPUs", "MACs", "CPU Load")
+        hostsT #= ("Int./Ext. IPs", "Node ID8(@)","Node Type", "OS", "CPUs", "MACs", "CPU Load")
 
         neighborhood.foreach {
             case (_, neighbors) =>
                 var ips = Set.empty[String]
                 var id8s = List.empty[String]
+                var nodeTypes = List.empty[String]
                 var macs = Set.empty[String]
                 var cpuLoadSum = 0.0
 
@@ -287,6 +288,7 @@ class VisorTopologyCommand extends VisorConsoleCommand {
                 neighbors.foreach(n => {
                     id8s = id8s :+ (i.toString + ": " + nodeId8(n.id))
 
+                    nodeTypes = nodeTypes :+ (if (n.isClient) "Client" else "Server")
                     i += 1
 
                     ips = ips ++ n.addresses()
@@ -300,6 +302,7 @@ class VisorTopologyCommand extends VisorConsoleCommand {
                 hostsT += (
                     ips.toSeq,
                     id8s,
+                    nodeTypes,
                     os,
                     cpus,
                     macs.toSeq,


[09/12] ignite git commit: Fixed conflict resolver API.

Posted by sb...@apache.org.
Fixed conflict resolver API.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e61602e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e61602e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e61602e

Branch: refs/heads/ignite-2224
Commit: 4e61602eca679bf3689bb23f2bc1c9e58b4eb8dc
Parents: 1945b98
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Mon Jan 25 19:16:47 2016 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Mon Feb 1 07:43:59 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheOperationContext.java |  43 +++++--
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../processors/cache/GridCacheProxyImpl.java    |  11 +-
 .../processors/cache/IgniteCacheProxy.java      |  43 ++++++-
 .../dht/atomic/GridDhtAtomicCache.java          | 104 +++++++++++++----
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   4 +-
 .../processors/cache/dr/GridCacheDrInfo.java    |  49 +++++++-
 .../transactions/IgniteTxLocalAdapter.java      |  81 +++++++++-----
 .../cache/version/GridCacheVersionManager.java  |  23 ++--
 .../testframework/junits/GridAbstractTest.java  |   3 +
 parent/pom.xml                                  | 111 +++++++++++--------
 pom.xml                                         |  16 ---
 12 files changed, 351 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index 21934d0..f39a09d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -48,6 +48,9 @@ public class CacheOperationContext implements Serializable {
     /** Expiry policy. */
     private final ExpiryPolicy expiryPlc;
 
+    /** Data center Id. */
+    private final Byte dataCenterId;
+
     /**
      * Constructor with default values.
      */
@@ -61,6 +64,8 @@ public class CacheOperationContext implements Serializable {
         expiryPlc = null;
 
         noRetries = false;
+
+        dataCenterId = null;
     }
 
     /**
@@ -68,13 +73,15 @@ public class CacheOperationContext implements Serializable {
      * @param subjId Subject ID.
      * @param keepBinary Keep binary flag.
      * @param expiryPlc Expiry policy.
+     * @param dataCenterId Data center id.
      */
     public CacheOperationContext(
         boolean skipStore,
         @Nullable UUID subjId,
         boolean keepBinary,
         @Nullable ExpiryPolicy expiryPlc,
-        boolean noRetries) {
+        boolean noRetries,
+        @Nullable Byte dataCenterId) {
         this.skipStore = skipStore;
 
         this.subjId = subjId;
@@ -84,6 +91,8 @@ public class CacheOperationContext implements Serializable {
         this.expiryPlc = expiryPlc;
 
         this.noRetries = noRetries;
+
+        this.dataCenterId = dataCenterId;
     }
 
     /**
@@ -94,6 +103,13 @@ public class CacheOperationContext implements Serializable {
     }
 
     /**
+     * @return {@code True} if data center id is set otherwise {@code false}.
+     */
+    public boolean hasDataCenterId() {
+        return dataCenterId != null;
+    }
+
+    /**
      * See {@link IgniteInternalCache#keepBinary()}.
      *
      * @return New instance of CacheOperationContext with keep binary flag.
@@ -104,7 +120,8 @@ public class CacheOperationContext implements Serializable {
             subjId,
             true,
             expiryPlc,
-            noRetries);
+            noRetries,
+            dataCenterId);
     }
 
     /**
@@ -117,6 +134,15 @@ public class CacheOperationContext implements Serializable {
     }
 
     /**
+     * Gets data center ID.
+     *
+     * @return Client ID.
+     */
+    @Nullable public Byte dataCenterId() {
+        return dataCenterId;
+    }
+
+    /**
      * See {@link IgniteInternalCache#forSubjectId(UUID)}.
      *
      * @param subjId Subject id.
@@ -128,7 +154,8 @@ public class CacheOperationContext implements Serializable {
             subjId,
             keepBinary,
             expiryPlc,
-            noRetries);
+            noRetries,
+            dataCenterId);
     }
 
     /**
@@ -150,7 +177,8 @@ public class CacheOperationContext implements Serializable {
             subjId,
             keepBinary,
             expiryPlc,
-            noRetries);
+            noRetries,
+            dataCenterId);
     }
 
     /**
@@ -172,7 +200,8 @@ public class CacheOperationContext implements Serializable {
             subjId,
             true,
             plc,
-            noRetries);
+            noRetries,
+            dataCenterId);
     }
 
     /**
@@ -185,8 +214,8 @@ public class CacheOperationContext implements Serializable {
             subjId,
             keepBinary,
             expiryPlc,
-            noRetries
-        );
+            noRetries,
+            dataCenterId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3081cfb..9fd65e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -447,7 +447,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
-        CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false);
+        CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false, null);
 
         return new GridCacheProxyImpl<>(ctx, this, opCtx);
     }
@@ -459,14 +459,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) {
-        CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false);
+        CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false, null);
 
         return new GridCacheProxyImpl<>(ctx, this, opCtx);
     }
 
     /** {@inheritDoc} */
     @Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() {
-        CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false);
+        CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false, null);
 
         return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx);
     }
@@ -483,7 +483,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         assert !CU.isAtomicsCache(ctx.name());
         assert !CU.isMarshallerCache(ctx.name());
 
-        CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false);
+        CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false, null);
 
         return new GridCacheProxyImpl<>(ctx, this, opCtx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 8ffd273..3a53942 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -209,7 +209,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     /** {@inheritDoc} */
     @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
         return new GridCacheProxyImpl<>(ctx, delegate,
-            opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null, false));
+            opCtx != null ? opCtx.forSubjectId(subjId) :
+                new CacheOperationContext(false, subjId, false, null, false, null));
     }
 
     /** {@inheritDoc} */
@@ -221,7 +222,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
                 return this;
 
             return new GridCacheProxyImpl<>(ctx, delegate,
-                opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null, false));
+                opCtx != null ? opCtx.setSkipStore(skipStore) :
+                    new CacheOperationContext(true, null, false, null, false, null));
         }
         finally {
             gate.leave(prev);
@@ -236,7 +238,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
 
         return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx,
             (GridCacheAdapter<K1, V1>)delegate,
-            opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false));
+            opCtx != null ? opCtx.keepBinary() : new CacheOperationContext(false, null, true, null, false, null));
     }
 
     /** {@inheritDoc} */
@@ -1608,7 +1610,8 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
 
         try {
             return new GridCacheProxyImpl<>(ctx, delegate,
-                opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc, false));
+                opCtx != null ? opCtx.withExpiryPolicy(plc) :
+                    new CacheOperationContext(false, null, false, plc, false, null));
         }
         finally {
             gate.leave(prev);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b64c69c..9e66d4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -307,7 +307,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
 
         try {
             CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) :
-                new CacheOperationContext(false, null, false, plc, false);
+                new CacheOperationContext(false, null, false, plc, false, null);
 
             return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock);
         }
@@ -339,7 +339,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                 return this;
 
             CacheOperationContext opCtx0 = opCtx != null ? opCtx.setNoRetries(true) :
-                new CacheOperationContext(false, null, false, null, true);
+                new CacheOperationContext(false, null, false, null, true, null);
 
             return new IgniteCacheProxy<>(ctx,
                 delegate,
@@ -1788,7 +1788,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                     opCtx != null ? opCtx.subjectId() : null,
                     true,
                     opCtx != null ? opCtx.expiry() : null,
-                    opCtx != null && opCtx.noRetries());
+                    opCtx != null && opCtx.noRetries(),
+                    opCtx != null ? opCtx.dataCenterId() : null);
 
             return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
                 (GridCacheAdapter<K1, V1>)delegate,
@@ -1802,6 +1803,39 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /**
+     * @return Projection for data center id.
+     */
+    @SuppressWarnings("unchecked")
+    public IgniteCache<K, V> withDataCenterId(byte dataCenterId) {
+        CacheOperationContext prev = onEnter(gate, opCtx);
+
+        try {
+            Byte prevDataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
+            if (prevDataCenterId != null && dataCenterId == prevDataCenterId)
+                return this;
+
+            CacheOperationContext opCtx0 =
+                new CacheOperationContext(
+                    opCtx != null && opCtx.skipStore(),
+                    opCtx != null ? opCtx.subjectId() : null,
+                    opCtx != null && opCtx.isKeepBinary(),
+                    opCtx != null ? opCtx.expiry() : null,
+                    opCtx != null && opCtx.noRetries(),
+                    dataCenterId);
+
+            return new IgniteCacheProxy<>(ctx,
+                delegate,
+                opCtx0,
+                isAsync(),
+                lock);
+        }
+        finally {
+            onLeave(gate, prev);
+        }
+    }
+
+    /**
      * @return Cache with skip store enabled.
      */
     public IgniteCache<K, V> skipStore() {
@@ -1820,7 +1854,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                     opCtx != null ? opCtx.subjectId() : null,
                     opCtx != null && opCtx.isKeepBinary(),
                     opCtx != null ? opCtx.expiry() : null,
-                    opCtx != null && opCtx.noRetries());
+                    opCtx != null && opCtx.noRetries(),
+                    opCtx != null ? opCtx.dataCenterId() : null);
 
             return new IgniteCacheProxy<>(ctx,
                 delegate,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index aa79cfa..6b23550 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -99,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -448,7 +448,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             true,
             false,
             filter,
-            true);
+            true,
+            UPDATE);
     }
 
     /** {@inheritDoc} */
@@ -464,7 +465,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             filter,
-            true);
+            true,
+            UPDATE);
     }
 
     /** {@inheritDoc} */
@@ -479,7 +481,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             true,
             false,
             ctx.noValArray(),
-            false).get();
+            false,
+            UPDATE).get();
     }
 
     /** {@inheritDoc} */
@@ -571,7 +574,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             true,
             true,
             ctx.equalsValArray(oldVal),
-            true);
+            true,
+            UPDATE);
     }
 
     /** {@inheritDoc} */
@@ -589,7 +593,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             CU.empty0(),
-            true).chain(RET2NULL);
+            true,
+            UPDATE).chain(RET2NULL);
     }
 
     /** {@inheritDoc} */
@@ -610,7 +615,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             null,
-            true);
+            true,
+            UPDATE);
     }
 
     /** {@inheritDoc} */
@@ -790,7 +796,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             null,
-            true);
+            true,
+            TRANSFORM);
 
         return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
             @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut)
@@ -846,7 +853,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             null,
-            true);
+            true,
+            TRANSFORM);
 
         return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() {
             @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException {
@@ -882,7 +890,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             false,
             false,
             null,
-            true);
+            true,
+            TRANSFORM);
     }
 
     /**
@@ -901,15 +910,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("ConstantConditions")
     private IgniteInternalFuture updateAllAsync0(
-        @Nullable final Map<? extends K, ? extends V> map,
-        @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
+        @Nullable Map<? extends K, ? extends V> map,
+        @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
         @Nullable Object[] invokeArgs,
-        @Nullable final Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
-        @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
+        @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
+        @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
         final boolean retval,
         final boolean rawRetval,
         @Nullable final CacheEntryPredicate[] filter,
-        final boolean waitTopFut
+        final boolean waitTopFut,
+        final GridCacheOperation op
     ) {
         assert ctx.updatesAllowed();
 
@@ -918,7 +928,47 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(SecurityPermission.CACHE_PUT);
 
-        CacheOperationContext opCtx = ctx.operationContextPerCall();
+        final CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        if (opCtx != null && opCtx.hasDataCenterId()) {
+            assert conflictPutMap == null : conflictPutMap;
+            assert conflictRmvMap == null : conflictRmvMap;
+
+            if (op == GridCacheOperation.TRANSFORM) {
+                assert invokeMap != null : invokeMap;
+
+                conflictPutMap = F.viewReadOnly((Map)invokeMap,
+                    new IgniteClosure<EntryProcessor, GridCacheDrInfo>() {
+                        @Override public GridCacheDrInfo apply(EntryProcessor o) {
+                            return new GridCacheDrInfo(o, ctx.versions().next(opCtx.dataCenterId()));
+                        }
+                    });
+
+                invokeMap = null;
+            }
+            else if (op == GridCacheOperation.DELETE) {
+                assert map != null : map;
+
+                conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() {
+                    @Override public GridCacheVersion apply(V o) {
+                        return ctx.versions().next(opCtx.dataCenterId());
+                    }
+                });
+
+                map = null;
+            }
+            else {
+                assert map != null : map;
+
+                conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() {
+                    @Override public GridCacheDrInfo apply(V o) {
+                        return new GridCacheDrInfo(ctx.toCacheObject(o), ctx.versions().next(opCtx.dataCenterId()));
+                    }
+                });
+
+                map = null;
+            }
+        }
 
         UUID subjId = ctx.subjectIdPerCall(null, opCtx);
 
@@ -928,7 +978,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             ctx,
             this,
             ctx.config().getWriteSynchronizationMode(),
-            invokeMap != null ? TRANSFORM : UPDATE,
+            op,
             map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
                 conflictPutMap.keySet() : conflictRmvMap.keySet(),
             map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
@@ -966,8 +1016,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @return Completion future.
      */
     private IgniteInternalFuture removeAllAsync0(
-        @Nullable final Collection<? extends K> keys,
-        @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictMap,
+        @Nullable Collection<? extends K> keys,
+        @Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap,
         final boolean retval,
         boolean rawRetval,
         @Nullable final CacheEntryPredicate[] filter
@@ -985,12 +1035,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
-        CacheOperationContext opCtx = ctx.operationContextPerCall();
+        final CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         UUID subjId = ctx.subjectIdPerCall(null, opCtx);
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
+        Collection<GridCacheVersion> drVers = null;
+
+        if (opCtx != null && keys != null && opCtx.hasDataCenterId()) {
+            assert conflictMap == null : conflictMap;
+
+            drVers = F.transform(keys, new C1<K, GridCacheVersion>() {
+                @Override public GridCacheVersion apply(K k) {
+                    return ctx.versions().next(opCtx.dataCenterId());
+                }
+            });
+        }
+
         final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
             this,
@@ -1000,7 +1062,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             null,
             null,
             null,
-            keys != null ? null : conflictMap.values(),
+            drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
             retval,
             rawRetval,
             (filter != null && opCtx != null) ? opCtx.expiry() : null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 3c86083..c9e1a11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -1034,7 +1034,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 else if (conflictPutVals != null) {
                     GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
 
-                    val = conflictPutVal.value();
+                    val = conflictPutVal.valueEx();
                     conflictVer = conflictPutVal.version();
                     conflictTtl =  conflictPutVal.ttl();
                     conflictExpireTime = conflictPutVal.expireTime();
@@ -1142,7 +1142,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 // Conflict PUT.
                 GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
 
-                val = conflictPutVal.value();
+                val = conflictPutVal.valueEx();
                 conflictVer = conflictPutVal.version();
                 conflictTtl = conflictPutVal.ttl();
                 conflictExpireTime = conflictPutVal.expireTime();

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index 8635fe2..02bc6b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -36,6 +37,9 @@ public class GridCacheDrInfo implements Externalizable {
     /** Value. */
     private CacheObject val;
 
+    /** Entry processor. */
+    private EntryProcessor proc;
+
     /** DR version. */
     private GridCacheVersion ver;
 
@@ -61,6 +65,29 @@ public class GridCacheDrInfo implements Externalizable {
     }
 
     /**
+     * Constructor.
+     *
+     * @param ver Version.
+     */
+    public GridCacheDrInfo(GridCacheVersion ver) {
+        this.ver = ver;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param proc Entry processor.
+     * @param ver Version.
+     */
+    public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) {
+        assert proc != null;
+        assert ver != null;
+
+        this.proc = proc;
+        this.ver = ver;
+    }
+
+    /**
      * @return Value.
      */
     public CacheObject value() {
@@ -68,6 +95,20 @@ public class GridCacheDrInfo implements Externalizable {
     }
 
     /**
+     * @return Entry processor.
+     */
+    public EntryProcessor entryProcessor() {
+        return proc;
+    }
+
+    /**
+     * @return Value (entry processor or cache object.
+     */
+    public Object valueEx() {
+        return val == null ? proc : val;
+    }
+
+    /**
      * @return Version.
      */
     public GridCacheVersion version() {
@@ -88,13 +129,13 @@ public class GridCacheDrInfo implements Externalizable {
         return CU.EXPIRE_TIME_ETERNAL;
     }
 
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         assert false;
     }
 
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
         assert false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 926eaf2..aad9841 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1977,8 +1977,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         GridCacheContext cacheCtx,
         Map<KeyCacheObject, GridCacheDrInfo> drMap
     ) {
+        Map<KeyCacheObject, Object> map = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
+            @Override public Object apply(GridCacheDrInfo val) {
+                return val.value();
+            }
+        });
+
         return this.<Object, Object>putAllAsync0(cacheCtx,
-            null,
+            map,
             null,
             null,
             drMap,
@@ -2055,7 +2061,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         final GridCacheReturn ret,
         boolean skipStore,
         final boolean singleRmv,
-        boolean keepBinary) {
+        boolean keepBinary,
+        Byte dataCenterId) {
         try {
             addActiveCache(cacheCtx);
 
@@ -2066,6 +2073,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             if (entryProcessor != null)
                 transform = true;
 
+            GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
+
             boolean loadMissed = enlistWriteEntry(cacheCtx,
                 cacheKey,
                 val,
@@ -2075,7 +2084,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 retval,
                 lockOnly,
                 filter,
-                /*drVer*/null,
+                /*drVer*/drVer,
                 /*drTtl*/-1L,
                 /*drExpireTime*/-1L,
                 ret,
@@ -2125,6 +2134,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      * @param drRmvMap DR remove map (optional).
      * @param skipStore Skip store flag.
      * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}.
+     * @param keepBinary Keep binary flag.
+     * @param dataCenterId Optional data center ID.
      * @return Future for missing values loading.
      */
     private <K, V> IgniteInternalFuture<Void> enlistWrite(
@@ -2143,7 +2154,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         @Nullable Map<KeyCacheObject, GridCacheVersion> drRmvMap,
         boolean skipStore,
         final boolean singleRmv,
-        final boolean keepBinary
+        final boolean keepBinary,
+        Byte dataCenterId
     ) {
         assert retval || invokeMap == null;
 
@@ -2197,6 +2209,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                     drTtl = -1L;
                     drExpireTime = -1L;
                 }
+                else if (dataCenterId != null) {
+                    drVer = cctx.versions().next(dataCenterId);
+                    drTtl = -1L;
+                    drExpireTime = -1L;
+                }
                 else {
                     drVer = null;
                     drTtl = -1L;
@@ -2938,6 +2955,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
             CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
 
+            final Byte dataCenterId = opCtx != null ? opCtx.dataCenterId() : null;
+
             KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
 
             boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
@@ -2955,7 +2974,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 ret,
                 opCtx != null && opCtx.skipStore(),
                 /*singleRmv*/false,
-                keepBinary);
+                keepBinary,
+                dataCenterId);
 
             if (pessimistic()) {
                 assert loadFut == null || loadFut.isDone() : loadFut;
@@ -3053,7 +3073,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         @Nullable Map<? extends K, ? extends V> map,
         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
         @Nullable final Object[] invokeArgs,
-        @Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap,
+        @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
         final boolean retval,
         @Nullable final CacheEntryPredicate[] filter
     ) {
@@ -3066,25 +3086,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             return new GridFinishedFuture(e);
         }
 
-        // Cached entry may be passed only from entry wrapper.
-        final Map<?, ?> map0;
-        final Map<?, EntryProcessor<K, V, Object>> invokeMap0;
+        final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
 
-        if (drMap != null) {
-            assert map == null;
+        final Byte dataCenterId;
 
-            map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() {
-                @Override public Object apply(GridCacheDrInfo val) {
-                    return val.value();
-                }
-            });
+        if (opCtx != null && opCtx.hasDataCenterId()) {
+            assert drMap == null : drMap;
+            assert map != null || invokeMap != null;
 
-            invokeMap0 = null;
-        }
-        else {
-            map0 = map;
-            invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
+            dataCenterId = opCtx.dataCenterId();
         }
+        else
+            dataCenterId = null;
+
+        // Cached entry may be passed only from entry wrapper.
+        final Map<?, ?> map0 = map;
+        final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap;
 
         if (log.isDebugEnabled())
             log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]");
@@ -3110,8 +3127,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
             final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
 
-            CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
             final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
 
             final IgniteInternalFuture<Void> loadFut = enlistWrite(
@@ -3130,7 +3145,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 null,
                 opCtx != null && opCtx.skipStore(),
                 false,
-                keepBinary);
+                keepBinary,
+                dataCenterId);
 
             if (pessimistic()) {
                 assert loadFut == null || loadFut.isDone() : loadFut;
@@ -3334,6 +3350,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         else
             keys0 = keys;
 
+        CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+        final Byte dataCenterId;
+
+        if (opCtx != null && opCtx.hasDataCenterId()) {
+            assert drMap == null : drMap;
+
+            dataCenterId = opCtx.dataCenterId();
+        }
+        else
+            dataCenterId = null;
+
         assert keys0 != null;
 
         if (log.isDebugEnabled()) {
@@ -3367,8 +3395,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
         final Collection<KeyCacheObject> enlisted = new ArrayList<>();
 
-        CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
         ExpiryPolicy plc;
 
         if (!F.isEmpty(filter))
@@ -3394,7 +3420,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             drMap,
             opCtx != null && opCtx.skipStore(),
             singleRmv,
-            keepBinary
+            keepBinary,
+            dataCenterId
         );
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 68d03cd..166c713 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -176,7 +176,15 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version based on current topology.
      */
     public GridCacheVersion next() {
-        return next(cctx.kernalContext().discovery().topologyVersion(), true, false);
+        return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
+    }
+
+    /**
+     * @param dataCenterId Data center id.
+     * @return Next version based on current topology with given data center id.
+     */
+    public GridCacheVersion next(byte dataCenterId) {
+        return next(cctx.kernalContext().discovery().topologyVersion(), true, false, dataCenterId);
     }
 
     /**
@@ -188,7 +196,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version based on given topology version.
      */
     public GridCacheVersion next(AffinityTopologyVersion topVer) {
-        return next(topVer.topologyVersion(), true, false);
+        return next(topVer.topologyVersion(), true, false, dataCenterId);
     }
 
     /**
@@ -197,7 +205,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version for cache store operations.
      */
     public GridCacheVersion nextForLoad() {
-        return next(cctx.kernalContext().discovery().topologyVersion(), true, true);
+        return next(cctx.kernalContext().discovery().topologyVersion(), true, true, dataCenterId);
     }
 
     /**
@@ -206,7 +214,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version for cache store operations.
      */
     public GridCacheVersion nextForLoad(AffinityTopologyVersion topVer) {
-        return next(topVer.topologyVersion(), true, true);
+        return next(topVer.topologyVersion(), true, true, dataCenterId);
     }
 
     /**
@@ -215,7 +223,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version for cache store operations.
      */
     public GridCacheVersion nextForLoad(GridCacheVersion ver) {
-        return next(ver.topologyVersion(), false, true);
+        return next(ver.topologyVersion(), false, true, dataCenterId);
     }
 
     /**
@@ -225,7 +233,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @return Next version based on given cache version.
      */
     public GridCacheVersion next(GridCacheVersion ver) {
-        return next(ver.topologyVersion(), false, false);
+        return next(ver.topologyVersion(), false, false, dataCenterId);
     }
 
     /**
@@ -237,9 +245,10 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
      * @param topVer Topology version for which new version should be obtained.
      * @param addTime If {@code true} then adds to the given topology version number of seconds
      *        from the start time of the first grid node.
+     * @param dataCenterId Data center id.
      * @return New lock order.
      */
-    private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad) {
+    private GridCacheVersion next(long topVer, boolean addTime, boolean forLoad, byte dataCenterId) {
         if (topVer == -1)
             topVer = cctx.kernalContext().discovery().topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 99d1a42..8bf877a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1026,6 +1026,9 @@ public abstract class GridAbstractTest extends TestCase {
     protected IgniteConfiguration loadConfiguration(String springCfgPath) throws IgniteCheckedException {
         URL cfgLocation = U.resolveIgniteUrl(springCfgPath);
 
+        if (cfgLocation == null)
+            cfgLocation = U.resolveIgniteUrl(springCfgPath, false);
+
         assert cfgLocation != null;
 
         ApplicationContext springCtx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 21d8c69..c0f49c8 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -214,13 +214,6 @@
             <version>4.11</version>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-apache-license-gen</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope><!-- hack to have ignite-apache-license-gen at first place at mvn reactor -->
-        </dependency>
     </dependencies>
 
     <build>
@@ -715,48 +708,6 @@
                     </execution>
                 </executions>
             </plugin>
-
-            <plugin><!-- generates dependencies licenses -->
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-remote-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>ignite-dependencies</id>
-                        <goals>
-                            <goal>process</goal>
-                        </goals>
-                        <configuration>
-                            <resourceBundles>
-                                <resourceBundle>org.apache.ignite:ignite-apache-license-gen:${project.version}</resourceBundle>
-                            </resourceBundles>
-                            <excludeTransitive>true</excludeTransitive>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-antrun-plugin</artifactId>
-                <version>1.7</version>
-                <executions>
-                    <execution>
-                        <id>licenses-file-rename</id>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
-                        <phase>compile</phase>
-                        <configuration>
-                            <target>
-                                <!-- moving licenses generated by "ignite-dependencies" -->
-                                <move file="${basedir}/target/classes/META-INF/licenses.txt" tofile="${basedir}/target/licenses/${project.artifactId}-licenses.txt"/>
-                            </target>
-                            <failOnError>false</failOnError>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
         </plugins>
     </build>
 
@@ -998,5 +949,67 @@
                 </dependency>
             </dependencies>
         </profile>
+
+        <profile>
+            <id>release</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.ignite</groupId>
+                    <artifactId>ignite-apache-license-gen</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope><!-- hack to have ignite-apache-license-gen at first place at mvn reactor -->
+                </dependency>
+            </dependencies>
+
+            <build>
+                <plugins>
+                    <plugin><!-- generates dependencies licenses -->
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-remote-resources-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>ignite-dependencies</id>
+                                <goals>
+                                    <goal>process</goal>
+                                </goals>
+                                <configuration>
+                                    <resourceBundles>
+                                        <resourceBundle>org.apache.ignite:ignite-apache-license-gen:${project.version}
+                                        </resourceBundle>
+                                    </resourceBundles>
+                                    <excludeTransitive>true</excludeTransitive>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-antrun-plugin</artifactId>
+                        <version>1.7</version>
+                        <executions>
+                            <execution>
+                                <id>licenses-file-rename</id>
+                                <goals>
+                                    <goal>run</goal>
+                                </goals>
+                                <phase>compile</phase>
+                                <configuration>
+                                    <target>
+                                        <!-- moving licenses generated by "ignite-dependencies" -->
+                                        <move file="${basedir}/target/classes/META-INF/licenses.txt" tofile="${basedir}/target/licenses/${project.artifactId}-licenses.txt"/>
+                                    </target>
+                                    <failOnError>false</failOnError>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
     </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e61602e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4f797e8..bead3ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -918,22 +918,6 @@
                     </execution>
                 </executions>
             </plugin>
-
-            <plugin><!-- skipping generates dependencies licenses -->
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-remote-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>ignite-dependencies</id>
-                        <goals>
-                            <goal>process</goal>
-                        </goals>
-                        <configuration>
-                           <skip>true</skip>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
 </project>


[07/12] ignite git commit: IGNITE-1069 Added output of node type (server or client) in Visor commandline top command - Fixes #440.

Posted by sb...@apache.org.
IGNITE-1069 Added output of node type (server or client) in Visor commandline top command - Fixes #440.

Signed-off-by: Alexey Kuznetsov <ak...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59691291
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59691291
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59691291

Branch: refs/heads/ignite-2224
Commit: 59691291b55265e561ee5b707da8d95d2eee79db
Parents: 304370c
Author: kcheng <kc...@gmail.com>
Authored: Mon Feb 1 10:48:49 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Feb 1 10:48:49 2016 +0700

----------------------------------------------------------------------
 .../apache/ignite/visor/commands/top/VisorTopologyCommand.scala | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/59691291/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
index 5e278ed..d2ec662 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/top/VisorTopologyCommand.scala
@@ -263,12 +263,13 @@ class VisorTopologyCommand extends VisorConsoleCommand {
 
         val hostsT = VisorTextTable()
 
-        hostsT #= ("Int./Ext. IPs", "Node ID8(@)", "OS", "CPUs", "MACs", "CPU Load")
+        hostsT #= ("Int./Ext. IPs", "Node ID8(@)","Node Type", "OS", "CPUs", "MACs", "CPU Load")
 
         neighborhood.foreach {
             case (_, neighbors) =>
                 var ips = Set.empty[String]
                 var id8s = List.empty[String]
+                var nodeTypes = List.empty[String]
                 var macs = Set.empty[String]
                 var cpuLoadSum = 0.0
 
@@ -287,6 +288,7 @@ class VisorTopologyCommand extends VisorConsoleCommand {
                 neighbors.foreach(n => {
                     id8s = id8s :+ (i.toString + ": " + nodeId8(n.id))
 
+                    nodeTypes = nodeTypes :+ (if (n.isClient) "Client" else "Server")
                     i += 1
 
                     ips = ips ++ n.addresses()
@@ -300,6 +302,7 @@ class VisorTopologyCommand extends VisorConsoleCommand {
                 hostsT += (
                     ips.toSeq,
                     id8s,
+                    nodeTypes,
                     os,
                     cpus,
                     macs.toSeq,


[04/12] ignite git commit: IGNITE-2016: Kafka Connect integration - reflected review comments (avoiding setting same task parameters more than once). - Fixes #335.

Posted by sb...@apache.org.
IGNITE-2016: Kafka Connect integration - reflected review comments (avoiding setting same task parameters more than once). - Fixes #335.

Signed-off-by: shtykh_roman <rs...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c92c2747
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c92c2747
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c92c2747

Branch: refs/heads/ignite-2224
Commit: c92c274737391546b3dfe3ccbe527329c462d95f
Parents: 861236a
Author: shtykh_roman <rs...@yahoo.com>
Authored: Mon Feb 1 10:35:02 2016 +0900
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Mon Feb 1 10:35:02 2016 +0900

----------------------------------------------------------------------
 modules/kafka/README.txt                        | 111 +++++-
 modules/kafka/pom.xml                           |  69 ++--
 .../ignite/stream/kafka/KafkaStreamer.java      |   2 +-
 .../kafka/connect/IgniteSinkConnector.java      |  91 +++++
 .../kafka/connect/IgniteSinkConstants.java      |  38 ++
 .../stream/kafka/connect/IgniteSinkTask.java    | 165 ++++++++
 .../kafka/IgniteKafkaStreamerSelfTestSuite.java |   9 +-
 .../stream/kafka/KafkaEmbeddedBroker.java       | 387 -------------------
 .../kafka/KafkaIgniteStreamerSelfTest.java      |  13 +-
 .../ignite/stream/kafka/SimplePartitioner.java  |  53 ---
 .../ignite/stream/kafka/TestKafkaBroker.java    | 237 ++++++++++++
 .../kafka/connect/IgniteSinkConnectorTest.java  | 250 ++++++++++++
 .../kafka/src/test/resources/example-ignite.xml |  71 ++++
 parent/pom.xml                                  |   9 +-
 14 files changed, 1011 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/README.txt
----------------------------------------------------------------------
diff --git a/modules/kafka/README.txt b/modules/kafka/README.txt
index 1eaf861..f4e56bd 100644
--- a/modules/kafka/README.txt
+++ b/modules/kafka/README.txt
@@ -1,16 +1,17 @@
 Apache Ignite Kafka Streamer Module
-------------------------
+-----------------------------------
 
 Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
 
-To enable Kafka Streamer module when starting a standalone node, move 'optional/ignite-Kafka' folder to
-'libs' folder before running 'ignite.{sh|bat}' script. The content of the module folder will
-be added to classpath in this case.
+There are two ways this can be achieved:
+- importing Kafka Streamer module in your Maven project and instantiate KafkaStreamer for data streaming;
+- using Kafka Connect functionality.
 
-Importing Ignite Kafka Streamer Module In Maven Project
--------------------------------------
+Below are the details.
 
-If you are using Maven to manage dependencies of your project, you can add JCL module
+## Importing Ignite Kafka Streamer Module In Maven Project
+
+If you are using Maven to manage dependencies of your project, you can add Kafka module
 dependency like this (replace '${ignite.version}' with actual Ignite version you are
 interested in):
 
@@ -30,3 +31,99 @@ interested in):
     </dependencies>
     ...
 </project>
+
+
+## Streaming Data via Kafka Connect
+
+Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
+For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- ignite-spring-1.5.0-SNAPSHOT.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=string-ignite-connector
+connector.class=IgniteSinkConnector
+tasks.max=2
+topics=testTopic1,testTopic2
+
+# cache
+cacheName=cache1
+cacheAllowOverwrite=true
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. 'cacheAllowOverwrite' is set to true if you want to enable overwriting existing values in cache.
+You can also set 'cachePerNodeDataSize' and 'cachePerNodeParOps' to adjust per-node buffer and the maximum number
+of parallel stream operations for a single node.
+
+See example-ignite.xml in tests for a simple cache configuration file example.
+
+4. Start connector, for instance, as follows,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+## Checking the Flow
+
+To perform a very basic functionality check, you can do the following,
+
+1. Start Zookeeper
+```
+bin/zookeeper-server-start.sh config/zookeeper.properties
+```
+
+2. Start Kafka server
+```
+bin/kafka-server-start.sh config/server.properties
+```
+
+3. Provide some data input to the Kafka server
+```
+bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --operty key.separator=,
+k1,v1
+```
+
+4. Start the connector. For example,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+5. Check the value is in the cache. For example, via REST,
+```
+http://node1:8080/ignite?cmd=size&cacheName=cache1
+```

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index e00b190..0ac0487 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -20,7 +20,8 @@
 <!--
     POM file.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -43,48 +44,28 @@
 
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka_2.11</artifactId>
             <version>${kafka.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.sun.jmx</groupId>
-                    <artifactId>jmxri</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jdmk</groupId>
-                    <artifactId>jmxtools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>net.sf.jopt-simple</groupId>
-                    <artifactId>jopt-simple</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-simple</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-            <version>${zookeeper.version}</version>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <version>${kafka.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-log4j</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-runtime</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm-all</artifactId>
-            <version>${asm.version}</version>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator.version}</version>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
@@ -96,11 +77,33 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>${easymock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index cbc5b1b..487c369 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -224,4 +224,4 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
new file mode 100644
index 0000000..9385920
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Sink connector to manage sink tasks that transfer Kafka data to Ignite grid.
+ */
+public class IgniteSinkConnector extends SinkConnector {
+    /** Sink properties. */
+    private Map<String, String> configProps;
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    /**
+     * A sink lifecycle method. Validates grid-specific sink properties.
+     *
+     * @param props Sink properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        configProps = props;
+
+        try {
+            A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
+            A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
+            A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
+        }
+        catch (IllegalArgumentException e) {
+            throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
+        }
+    }
+
+    /**
+     * Obtains a sink task class to be instantiated for feeding data into grid.
+     *
+     * @return IgniteSinkTask class.
+     */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSinkTask.class;
+    }
+
+    /**
+     * Builds each config for <tt>maxTasks</tt> tasks.
+     *
+     * @param maxTasks Max number of tasks.
+     * @return Task configs.
+     */
+    @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> taskConfigs = new ArrayList<>();
+        Map<String, String> taskProps = new HashMap<>();
+
+        taskProps.putAll(configProps);
+
+        for (int i = 0; i < maxTasks; i++)
+            taskConfigs.add(taskProps);
+
+        return taskConfigs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
new file mode 100644
index 0000000..7680d96
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSinkConstants {
+    /** Ignite configuration file path. */
+    public static final String CACHE_CFG_PATH = "igniteCfg";
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cacheName";
+
+    /** Flag to enable overwriting existing values in cache. */
+    public static final String CACHE_ALLOW_OVERWRITE = "cacheAllowOverwrite";
+
+    /** Size of per-node buffer before data is sent to remote node. */
+    public static final String CACHE_PER_NODE_DATA_SIZE = "cachePerNodeDataSize";
+
+    /** Maximum number of parallel stream operations per node. */
+    public static final String CACHE_PER_NODE_PAR_OPS = "cachePerNodeParOps";
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
new file mode 100644
index 0000000..3d9a00d
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume sequences of SinkRecords and write data to grid.
+ */
+public class IgniteSinkTask extends SinkTask {
+    /** Logger. */
+    private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
+
+    /** Flag for stopped state. */
+    private static volatile boolean stopped = true;
+
+    /** Ignite grid configuration file. */
+    private static String igniteConfigFile;
+
+    /** Cache name. */
+    private static String cacheName;
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return new IgniteSinkConnector().version();
+    }
+
+    /**
+     * Initializes grid client from configPath.
+     *
+     * @param props Task properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        // Each task has the same parameters -- avoid setting more than once.
+        if (cacheName != null)
+            return;
+
+        cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
+        igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
+            StreamerContext.getStreamer().allowOverwrite(
+                Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
+            StreamerContext.getStreamer().perNodeBufferSize(
+                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
+            StreamerContext.getStreamer().perNodeParallelOperations(
+                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+
+        stopped = false;
+    }
+
+    /**
+     * Buffers records.
+     *
+     * @param records Records to inject into grid.
+     */
+    @SuppressWarnings("unchecked")
+    @Override public void put(Collection<SinkRecord> records) {
+        try {
+            for (SinkRecord record : records) {
+                if (record.key() != null) {
+                    // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
+                    StreamerContext.getStreamer().addData(record.key(), record.value());
+                }
+                else {
+                    log.error("Failed to stream a record with null key!");
+                }
+
+            }
+        }
+        catch (ConnectException e) {
+            log.error("Failed adding record", e);
+
+            throw new ConnectException(e);
+        }
+    }
+
+    /**
+     * Pushes buffered data to grid. Flush interval is configured by worker configurations.
+     *
+     * @param offsets Offset information.
+     */
+    @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (stopped)
+            return;
+
+        StreamerContext.getStreamer().flush();
+    }
+
+    /**
+     * Stops the grid client.
+     */
+    @Override public void stop() {
+        if (stopped)
+            return;
+
+        stopped = true;
+
+        StreamerContext.getStreamer().close();
+        StreamerContext.getIgnite().close();
+    }
+
+    /**
+     * Streamer context initializing grid and data streamer instances on demand.
+     */
+    public static class StreamerContext {
+        /** Constructor. */
+        private StreamerContext() {
+        }
+
+        /** Instance holder. */
+        private static class Holder {
+            private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
+            private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
+        }
+
+        /**
+         * Obtains grid instance.
+         *
+         * @return Grid instance.
+         */
+        public static Ignite getIgnite() {
+            return Holder.IGNITE;
+        }
+
+        /**
+         * Obtains data streamer instance.
+         *
+         * @return Data streamer instance.
+         */
+        public static IgniteDataStreamer getStreamer() {
+            return Holder.STREAMER;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
index 9115ab4..731f540 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -18,9 +18,10 @@
 package org.apache.ignite.stream.kafka;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest;
 
 /**
- * Apache Kafka streamer tests.
+ * Apache Kafka streamers tests.
  */
 public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
     /**
@@ -30,8 +31,12 @@ public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite");
 
+        // Kafka streamer.
         suite.addTest(new TestSuite(KafkaIgniteStreamerSelfTest.class));
 
+        // Kafka streamer via Connect API.
+        suite.addTest(new TestSuite(IgniteSinkConnectorTest.class));
+
         return suite;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
deleted file mode 100644
index 5e7cee7..0000000
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-import kafka.admin.AdminUtils;
-import kafka.api.LeaderAndIsr;
-import kafka.api.PartitionStateInfo;
-import kafka.api.Request;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.StringEncoder;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-
-/**
- * Kafka Embedded Broker.
- */
-public class KafkaEmbeddedBroker {
-    /** Default ZooKeeper host. */
-    private static final String ZK_HOST = "localhost";
-
-    /** Broker port. */
-    private static final int BROKER_PORT = 9092;
-
-    /** ZooKeeper connection timeout. */
-    private static final int ZK_CONNECTION_TIMEOUT = 6000;
-
-    /** ZooKeeper session timeout. */
-    private static final int ZK_SESSION_TIMEOUT = 6000;
-
-    /** ZooKeeper port. */
-    private static int zkPort = 0;
-
-    /** Is ZooKeeper ready. */
-    private boolean zkReady;
-
-    /** Kafka config. */
-    private KafkaConfig brokerCfg;
-
-    /** Kafka server. */
-    private KafkaServer kafkaSrv;
-
-    /** ZooKeeper client. */
-    private ZkClient zkClient;
-
-    /** Embedded ZooKeeper. */
-    private EmbeddedZooKeeper zooKeeper;
-
-    /**
-     * Creates an embedded Kafka broker.
-     */
-    public KafkaEmbeddedBroker() {
-        try {
-            setupEmbeddedZooKeeper();
-
-            setupEmbeddedKafkaServer();
-        }
-        catch (IOException | InterruptedException e) {
-            throw new RuntimeException("Failed to start Kafka broker " + e);
-        }
-    }
-
-    /**
-     * @return ZooKeeper address.
-     */
-    public static String getZKAddress() {
-        return ZK_HOST + ':' + zkPort;
-    }
-
-    /**
-     * Creates a Topic.
-     *
-     * @param topic Topic name.
-     * @param partitions Number of partitions for the topic.
-     * @param replicationFactor Replication factor.
-     * @throws TimeoutException If operation is timed out.
-     * @throws InterruptedException If interrupted.
-     */
-    public void createTopic(String topic, int partitions, int replicationFactor)
-        throws TimeoutException, InterruptedException {
-        AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
-
-        waitUntilMetadataIsPropagated(topic, 0, 10000, 100);
-    }
-
-    /**
-     * Sends message to Kafka broker.
-     *
-     * @param keyedMessages List of keyed messages.
-     * @return Producer used to send the message.
-     */
-    public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
-        Producer<String, String> producer = new Producer<>(getProducerConfig());
-
-        producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
-
-        return producer;
-    }
-
-    /**
-     * Shuts down Kafka broker.
-     */
-    public void shutdown() {
-        zkReady = false;
-
-        if (kafkaSrv != null)
-            kafkaSrv.shutdown();
-
-        List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerCfg.logDirs());
-
-        for (String logDir : logDirs)
-            U.delete(new File(logDir));
-
-        if (zkClient != null) {
-            zkClient.close();
-
-            zkClient = null;
-        }
-
-        if (zooKeeper != null) {
-
-            try {
-                zooKeeper.shutdown();
-            }
-            catch (IOException e) {
-                // No-op.
-            }
-
-            zooKeeper = null;
-        }
-    }
-
-    /**
-     * @return ZooKeeper client.
-     */
-    private ZkClient getZkClient() {
-        A.ensure(zkReady, "Zookeeper not setup yet");
-        A.notNull(zkClient, "Zookeeper client is not yet initialized");
-
-        return zkClient;
-    }
-
-    /**
-     * Checks if topic metadata is propagated.
-     *
-     * @param topic Topic name.
-     * @param part Partition.
-     * @return {@code True} if propagated, otherwise {@code false}.
-     */
-    private boolean isMetadataPropagated(String topic, int part) {
-        scala.Option<PartitionStateInfo> partStateOption =
-            kafkaSrv.apis().metadataCache().getPartitionInfo(topic, part);
-
-        if (!partStateOption.isDefined())
-            return false;
-
-        PartitionStateInfo partState = partStateOption.get();
-
-        LeaderAndIsr LeaderAndIsr = partState.leaderIsrAndControllerEpoch().leaderAndIsr();
-
-        return ZkUtils.getLeaderForPartition(getZkClient(), topic, part) != null &&
-            Request.isValidBrokerId(LeaderAndIsr.leader()) && LeaderAndIsr.isr().size() >= 1;
-    }
-
-    /**
-     * Waits until metadata is propagated.
-     *
-     * @param topic Topic name.
-     * @param part Partition.
-     * @param timeout Timeout value in millis.
-     * @param interval Interval in millis to sleep.
-     * @throws TimeoutException If operation is timed out.
-     * @throws InterruptedException If interrupted.
-     */
-    private void waitUntilMetadataIsPropagated(String topic, int part, long timeout, long interval)
-        throws TimeoutException, InterruptedException {
-        int attempt = 1;
-
-        long startTime = System.currentTimeMillis();
-
-        while (true) {
-            if (isMetadataPropagated(topic, part))
-                return;
-
-            long duration = System.currentTimeMillis() - startTime;
-
-            if (duration < timeout)
-                Thread.sleep(interval);
-            else
-                throw new TimeoutException("Metadata propagation is timed out, attempt " + attempt);
-
-            attempt++;
-        }
-    }
-
-    /**
-     * Sets up embedded Kafka server.
-     *
-     * @throws IOException If failed.
-     */
-    private void setupEmbeddedKafkaServer() throws IOException {
-        A.ensure(zkReady, "Zookeeper should be setup before hand");
-
-        brokerCfg = new KafkaConfig(getBrokerConfig());
-
-        kafkaSrv = new KafkaServer(brokerCfg, SystemTime$.MODULE$);
-
-        kafkaSrv.startup();
-    }
-
-    /**
-     * Sets up embedded ZooKeeper.
-     *
-     * @throws IOException If failed.
-     * @throws InterruptedException If interrupted.
-     */
-    private void setupEmbeddedZooKeeper() throws IOException, InterruptedException {
-        EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort);
-
-        zooKeeper.startup();
-
-        zkPort = zooKeeper.getActualPort();
-
-        zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
-
-        zkReady = true;
-    }
-
-    /**
-     * @return Kafka broker address.
-     */
-    private static String getBrokerAddress() {
-        return ZK_HOST + ':' + BROKER_PORT;
-    }
-
-    /**
-     * Gets Kafka broker config.
-     *
-     * @return Kafka broker config.
-     * @throws IOException If failed.
-     */
-    private static Properties getBrokerConfig() throws IOException {
-        Properties props = new Properties();
-
-        props.put("broker.id", "0");
-        props.put("host.name", ZK_HOST);
-        props.put("port", "" + BROKER_PORT);
-        props.put("log.dir", createTempDir("_cfg").getAbsolutePath());
-        props.put("zookeeper.connect", getZKAddress());
-        props.put("log.flush.interval.messages", "1");
-        props.put("replica.socket.timeout.ms", "1500");
-
-        return props;
-    }
-
-    /**
-     * @return Kafka Producer config.
-     */
-    private static ProducerConfig getProducerConfig() {
-        Properties props = new Properties();
-
-        props.put("metadata.broker.list", getBrokerAddress());
-        props.put("serializer.class", StringEncoder.class.getName());
-        props.put("key.serializer.class", StringEncoder.class.getName());
-        props.put("partitioner.class", SimplePartitioner.class.getName());
-
-        return new ProducerConfig(props);
-    }
-
-    /**
-     * Creates temp directory.
-     *
-     * @param prefix Prefix.
-     * @return Created file.
-     * @throws IOException If failed.
-     */
-    private static File createTempDir( String prefix) throws IOException {
-        Path path = Files.createTempDirectory(prefix);
-
-        return path.toFile();
-    }
-
-    /**
-     * Creates embedded ZooKeeper.
-     */
-    private static class EmbeddedZooKeeper {
-        /** Default ZooKeeper host. */
-        private final String zkHost;
-
-        /** Default ZooKeeper port. */
-        private final int zkPort;
-
-        /** NIO context factory. */
-        private NIOServerCnxnFactory factory;
-
-        /** Snapshot directory. */
-        private File snapshotDir;
-
-        /** Log directory. */
-        private File logDir;
-
-        /**
-         * Creates an embedded ZooKeeper.
-         *
-         * @param zkHost ZooKeeper host.
-         * @param zkPort ZooKeeper port.
-         */
-        EmbeddedZooKeeper(String zkHost, int zkPort) {
-            this.zkHost = zkHost;
-            this.zkPort = zkPort;
-        }
-
-        /**
-         * Starts up ZooKeeper.
-         *
-         * @throws IOException If failed.
-         * @throws InterruptedException If interrupted.
-         */
-        void startup() throws IOException, InterruptedException {
-            snapshotDir = createTempDir("_ss");
-
-            logDir = createTempDir("_log");
-
-            ZooKeeperServer zkSrv = new ZooKeeperServer(snapshotDir, logDir, 500);
-
-            factory = new NIOServerCnxnFactory();
-
-            factory.configure(new InetSocketAddress(zkHost, zkPort), 16);
-
-            factory.startup(zkSrv);
-        }
-
-        /**
-         * @return Actual port ZooKeeper is started.
-         */
-        int getActualPort() {
-            return factory.getLocalPort();
-        }
-
-        /**
-         * Shuts down ZooKeeper.
-         *
-         * @throws IOException If failed.
-         */
-        void shutdown() throws IOException {
-            if (factory != null) {
-                factory.shutdown();
-
-                U.delete(snapshotDir);
-
-                U.delete(logDir);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 927ba3d..829c877 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import kafka.consumer.ConsumerConfig;
 import kafka.producer.KeyedMessage;
@@ -40,14 +41,13 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
-import static org.apache.ignite.stream.kafka.KafkaEmbeddedBroker.getZKAddress;
 
 /**
  * Tests {@link KafkaStreamer}.
  */
 public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
     /** Embedded Kafka. */
-    private KafkaEmbeddedBroker embeddedBroker;
+    private TestKafkaBroker embeddedBroker;
 
     /** Count. */
     private static final int CNT = 100;
@@ -77,7 +77,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
 
-        embeddedBroker = new KafkaEmbeddedBroker();
+        embeddedBroker = new TestKafkaBroker();
     }
 
     /** {@inheritDoc} */
@@ -176,7 +176,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
             kafkaStmr.setThreads(4);
 
             // Set the consumer configuration.
-            kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(getZKAddress(), "groupX"));
+            kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
 
             // Set the decoders.
             StringDecoder strDecoder = new StringDecoder(new VerifiableProperties());
@@ -199,7 +199,8 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
 
             ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
 
-            latch.await();
+            // Checks all events successfully processed in 10 seconds.
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
 
             for (Map.Entry<String, String> entry : keyValMap.entrySet())
                 assertEquals(entry.getValue(), cache.get(entry.getKey()));
@@ -232,4 +233,4 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
 
         return new ConsumerConfig(props);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
deleted file mode 100644
index b49bebe..0000000
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.kafka;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Simple partitioner for Kafka.
- */
-@SuppressWarnings("UnusedDeclaration")
-public class SimplePartitioner implements Partitioner {
-    /**
-     * Constructs instance.
-     *
-     * @param props Properties.
-     */
-    public SimplePartitioner(VerifiableProperties props) {
-        // No-op.
-    }
-
-    /**
-     * Partitions the key based on the key value.
-     *
-     * @param key Key.
-     * @param partSize Partition size.
-     * @return partition Partition.
-     */
-    public int partition(Object key, int partSize) {
-        String keyStr = (String)key;
-
-        String[] keyValues = keyStr.split("\\.");
-
-        Integer intKey = Integer.parseInt(keyValues[3]);
-
-        return intKey > 0 ? intKey % partSize : 0;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
new file mode 100644
index 0000000..70acb78
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Producer;
+import kafka.producer.ProducerConfig;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.TestUtils;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.curator.test.TestingServer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import scala.Tuple2;
+
+/**
+ * Kafka Test Broker.
+ */
+public class TestKafkaBroker {
+    /** ZooKeeper connection timeout. */
+    private static final int ZK_CONNECTION_TIMEOUT = 6000;
+
+    /** ZooKeeper session timeout. */
+    private static final int ZK_SESSION_TIMEOUT = 6000;
+
+    /** ZooKeeper port. */
+    private static final int ZK_PORT = 21811;
+
+    /** Broker host. */
+    private static final String BROKER_HOST = "localhost";
+
+    /** Broker port. */
+    private static final int BROKER_PORT = 9092;
+
+    /** Kafka config. */
+    private KafkaConfig kafkaCfg;
+
+    /** Kafka server. */
+    private KafkaServer kafkaSrv;
+
+    /** ZooKeeper. */
+    private TestingServer zkServer;
+
+    /** Kafka Zookeeper utils. */
+    private ZkUtils zkUtils;
+
+    /**
+     * Kafka broker constructor.
+     */
+    public TestKafkaBroker() {
+        try {
+            setupZooKeeper();
+
+            setupKafkaServer();
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Failed to start Kafka: " + e);
+        }
+    }
+
+    /**
+     * Creates a topic.
+     *
+     * @param topic Topic name.
+     * @param partitions Number of partitions for the topic.
+     * @param replicationFactor Replication factor.
+     * @throws TimeoutException If operation is timed out.
+     * @throws InterruptedException If interrupted.
+     */
+    public void createTopic(String topic, int partitions, int replicationFactor)
+        throws TimeoutException, InterruptedException {
+        List<KafkaServer> servers = new ArrayList<>();
+
+        servers.add(kafkaSrv);
+
+        TestUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
+            scala.collection.JavaConversions.asScalaBuffer(servers), new Properties());
+    }
+
+    /**
+     * Sends a message to Kafka broker.
+     *
+     * @param keyedMessages List of keyed messages.
+     * @return Producer used to send the message.
+     */
+    public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
+        Producer<String, String> producer = new Producer<>(getProducerConfig());
+
+        producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+
+        return producer;
+    }
+
+    /**
+     * Shuts down test Kafka broker.
+     */
+    public void shutdown() {
+        if (zkUtils != null)
+            zkUtils.close();
+
+        if (kafkaSrv != null)
+            kafkaSrv.shutdown();
+
+        if (zkServer != null) {
+            try {
+                zkServer.stop();
+            }
+            catch (IOException e) {
+                // No-op.
+            }
+        }
+
+        List<String> logDirs = scala.collection.JavaConversions.seqAsJavaList(kafkaCfg.logDirs());
+
+        for (String logDir : logDirs)
+            U.delete(new File(logDir));
+    }
+
+    /**
+     * Sets up test Kafka broker.
+     *
+     * @throws IOException If failed.
+     */
+    private void setupKafkaServer() throws IOException {
+        kafkaCfg = new KafkaConfig(getKafkaConfig());
+
+        kafkaSrv = TestUtils.createServer(kafkaCfg, SystemTime$.MODULE$);
+
+        kafkaSrv.startup();
+    }
+
+    /**
+     * Sets up ZooKeeper test server.
+     *
+     * @throws Exception If failed.
+     */
+    private void setupZooKeeper() throws Exception {
+        zkServer = new TestingServer(ZK_PORT, true);
+
+        Tuple2<ZkClient, ZkConnection> zkTuple = ZkUtils.createZkClientAndConnection(zkServer.getConnectString(),
+            ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
+
+        zkUtils = new ZkUtils(zkTuple._1(), zkTuple._2(), false);
+    }
+
+    /**
+     * Obtains Kafka config.
+     *
+     * @return Kafka config.
+     * @throws IOException If failed.
+     */
+    private Properties getKafkaConfig() throws IOException {
+        Properties props = new Properties();
+
+        props.put("broker.id", "0");
+        props.put("zookeeper.connect", zkServer.getConnectString());
+        props.put("host.name", BROKER_HOST);
+        props.put("port", BROKER_PORT);
+        props.put("offsets.topic.replication.factor", "1");
+        props.put("log.dir", createTmpDir("_cfg").getAbsolutePath());
+        props.put("log.flush.interval.messages", "1");
+
+        return props;
+    }
+
+    /**
+     * Obtains broker address.
+     *
+     * @return Kafka broker address.
+     */
+    public String getBrokerAddress() {
+        return BROKER_HOST + ":" + BROKER_PORT;
+    }
+
+    /**
+     * Obtains Zookeeper address.
+     *
+     * @return Zookeeper address.
+     */
+    public String getZookeeperAddress() {
+        return BROKER_HOST + ":" + ZK_PORT;
+    }
+
+    /**
+     * Obtains producer config.
+     *
+     * @return Kafka Producer config.
+     */
+    private ProducerConfig getProducerConfig() {
+        Properties props = new Properties();
+
+        props.put("metadata.broker.list", getBrokerAddress());
+        props.put("bootstrap.servers", getBrokerAddress());
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+
+        return new ProducerConfig(props);
+    }
+
+    /**
+     * Creates temporary directory.
+     *
+     * @param prefix Prefix.
+     * @return Created file.
+     * @throws IOException If failed.
+     */
+    private static File createTmpDir(String prefix) throws IOException {
+        Path path = Files.createTempDirectory(prefix);
+
+        return path.toFile();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
new file mode 100644
index 0000000..a8583d0
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Producer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.mock;
+
+/**
+ * Tests for {@link IgniteSinkConnector}.
+ */
+public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
+    /** Number of input messages. */
+    private static final int EVENT_CNT = 10000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "testCache";
+
+    /** Test topics. */
+    private static final String[] TOPICS = {"test1", "test2"};
+
+    /** Kafka partition. */
+    private static final int PARTITIONS = 3;
+
+    /** Kafka replication factor. */
+    private static final int REPLICATION_FACTOR = 1;
+
+    /** Test Kafka broker. */
+    private TestKafkaBroker kafkaBroker;
+
+    /** Worker to run tasks. */
+    private Worker worker;
+
+    /** Workers' herder. */
+    private Herder herder;
+
+    /** Ignite server node. */
+    private Ignite grid;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTest() throws Exception {
+        IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
+
+        cfg.setClientMode(false);
+
+        grid = startGrid("igniteServerNode", cfg);
+
+        kafkaBroker = new TestKafkaBroker();
+
+        for (String topic : TOPICS)
+            kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
+
+        WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps());
+
+        OffsetBackingStore offsetBackingStore = mock(OffsetBackingStore.class);
+        offsetBackingStore.configure(anyObject(Map.class));
+
+        worker = new Worker(workerConfig, offsetBackingStore);
+        worker.start();
+
+        herder = new StandaloneHerder(worker);
+        herder.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        herder.stop();
+
+        worker.stop();
+
+        kafkaBroker.shutdown();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Tests the whole data flow from injecting data to Kafka to transferring it to the grid. It reads from two
+     * specified Kafka topics, because a sink task can read from multiple topics.
+     *
+     * @throws Exception Thrown in case of the failure.
+     */
+    public void testSinkPuts() throws Exception {
+        Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+            @Override
+            public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+                if (error != null)
+                    throw new RuntimeException("Failed to create a job!");
+            }
+        });
+
+        herder.putConnectorConfig(
+            sinkProps.get(ConnectorConfig.NAME_CONFIG),
+            sinkProps, false, cb);
+
+        cb.get();
+
+        final CountDownLatch latch = new CountDownLatch(EVENT_CNT * TOPICS.length);
+
+        final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assert evt != null;
+
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, EVT_CACHE_OBJECT_PUT);
+
+        IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+        assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+        Map<String, String> keyValMap = new HashMap<>(EVENT_CNT * TOPICS.length);
+
+        // Produces events for the specified number of topics
+        for (String topic : TOPICS)
+            keyValMap.putAll(produceStream(topic));
+
+        // Checks all events successfully processed in 10 seconds.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr);
+
+        // Checks that each event was processed properly.
+        for (Map.Entry<String, String> entry : keyValMap.entrySet())
+            assertEquals(entry.getValue(), cache.get(entry.getKey()));
+
+        assertEquals(EVENT_CNT * TOPICS.length, cache.size(CachePeekMode.PRIMARY));
+    }
+
+    /**
+     * Sends messages to Kafka.
+     *
+     * @param topic Topic name.
+     * @return Map of key value messages.
+     */
+    private Map<String, String> produceStream(String topic) {
+        List<KeyedMessage<String, String>> messages = new ArrayList<>(EVENT_CNT);
+
+        Map<String, String> keyValMap = new HashMap<>();
+
+        for (int evt = 0; evt < EVENT_CNT; evt++) {
+            long runtime = System.currentTimeMillis();
+
+            String key = topic + "_" + String.valueOf(evt);
+            String msg = runtime + String.valueOf(evt);
+
+            messages.add(new KeyedMessage<>(topic, key, msg));
+
+            keyValMap.put(key, msg);
+        }
+
+        Producer<String, String> producer = kafkaBroker.sendMessages(messages);
+
+        producer.close();
+
+        return keyValMap;
+    }
+
+    /**
+     * Creates properties for test sink connector.
+     *
+     * @param topics Topics.
+     * @return Test sink connector properties.
+     */
+    private Map<String, String> makeSinkProps(String topics) {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(ConnectorConfig.TOPICS_CONFIG, topics);
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+        props.put(ConnectorConfig.NAME_CONFIG, "test-connector");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
+        props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
+        props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");
+        props.put(IgniteSinkConstants.CACHE_CFG_PATH, "example-ignite.xml");
+
+        return props;
+    }
+
+    /**
+     * Creates properties for Kafka Connect workers.
+     *
+     * @return Worker configurations.
+     */
+    private Map<String, String> makeWorkerProps() {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("key.converter.schemas.enable", "false");
+        props.put("value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        // fast flushing for testing.
+        props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/resources/example-ignite.xml b/modules/kafka/src/test/resources/example-ignite.xml
new file mode 100644
index 0000000..fbb05d3
--- /dev/null
+++ b/modules/kafka/src/test/resources/example-ignite.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    Ignite configuration with all defaults and enabled p2p deployment and enabled events.
+    Used for testing IgniteSink running Ignite in a client mode.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <!-- Enable client mode. -->
+        <property name="clientMode" value="true"/>
+
+        <!-- Cache accessed from IgniteSink. -->
+        <property name="cacheConfiguration">
+            <list>
+                <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. -->
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="name" value="testCache"/>
+                </bean>
+            </list>
+        </property>
+
+        <!-- Enable cache events. -->
+        <property name="includeEventTypes">
+            <list>
+                <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
+                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+            </list>
+        </property>
+
+        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <value>127.0.0.1:47500</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 8f02fec..21d8c69 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -61,6 +61,7 @@
         <commons.lang.version>2.6</commons.lang.version>
         <cron4j.version>2.2.5</cron4j.version>
         <curator.version>2.9.1</curator.version>
+        <easymock.version>3.4</easymock.version>
         <ezmorph.bundle.version>1.0.6_1</ezmorph.bundle.version>
         <ezmorph.version>1.0.6</ezmorph.version>
         <flume.ng.version>1.6.0</flume.ng.version>
@@ -82,11 +83,9 @@
         <jsonlib.bundle.version>2.4_1</jsonlib.bundle.version>
         <jsonlib.version>2.4</jsonlib.version>
         <jtidy.version>r938</jtidy.version>
-        <kafka.bundle.version>0.8.2.1_1</kafka.bundle.version>
-        <kafka.clients.bundle.version>0.8.2.0_1</kafka.clients.bundle.version>
-        <kafka.clients.version>0.8.2.0</kafka.clients.version>
-        <kafka.version>0.8.2.1</kafka.version>
-        <kafka.version>0.8.2.1</kafka.version>
+        <kafka.bundle.version>0.9.0.0_1</kafka.bundle.version>
+        <kafka.clients.bundle.version>0.9.0.0_1</kafka.clients.bundle.version>
+        <kafka.version>0.9.0.0</kafka.version>
         <karaf.version>4.0.2</karaf.version>
         <lucene.bundle.version>3.5.0_1</lucene.bundle.version>
         <lucene.version>3.5.0</lucene.version>