You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/11/04 15:10:53 UTC

[10/36] ignite git commit: IGNITE-426 Cleanup code.

IGNITE-426 Cleanup code.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/785539be
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/785539be
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/785539be

Branch: refs/heads/ignite-462-2
Commit: 785539be09046c6b84669586c2cedbc939f23349
Parents: 61870a4
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Oct 22 13:30:26 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Nov 4 17:02:36 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 10 ++---
 .../cache/GridCacheUpdateTxResult.java          |  8 +---
 .../dht/GridClientPartitionTopology.java        |  2 -
 .../distributed/dht/GridDhtLocalPartition.java  | 40 +++++++++-----------
 .../dht/GridDhtPartitionTopologyImpl.java       |  6 +--
 .../dht/atomic/GridDhtAtomicCache.java          | 12 +++++-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  6 +--
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 19 +++++++++-
 .../distributed/near/GridNearAtomicCache.java   |  2 +-
 .../continuous/CacheContinuousQueryEntry.java   |  7 ++++
 .../continuous/CacheContinuousQueryHandler.java |  1 -
 .../CacheContinuousQueryListener.java           |  4 +-
 .../continuous/CacheContinuousQueryManager.java |  3 +-
 13 files changed, 65 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 2c3bf8c..bbd2ce0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1768,7 +1768,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         CacheObject oldVal;
         CacheObject updated;
 
-        GridCacheVersion rmvVer = null;
+        GridCacheVersion enqueueVer = null;
 
         GridCacheVersionConflictContext<?, ?> conflictCtx = null;
 
@@ -2325,7 +2325,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     }
                 }
 
-                rmvVer = newVer;
+                enqueueVer = newVer;
 
                 boolean hasValPtr = hasOffHeapPointer();
 
@@ -2404,7 +2404,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             invokeRes,
             newSysTtl,
             newSysExpireTime,
-            rmvVer,
+            enqueueVer,
             conflictCtx,
             true,
             updateIdx0);
@@ -4156,9 +4156,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      */
     protected void deletedUnlocked(boolean deleted) {
         assert Thread.holdsLock(this);
-
-        if (!cctx.deferredDelete())
-            return;
+        assert cctx.deferredDelete();
 
         if (deleted) {
             assert !deletedUnlocked() : this;

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index 0f63777..bea1000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -59,14 +59,8 @@ public class GridCacheUpdateTxResult {
     }
 
     /**
-     * Sets partition idx.
-     *
-     * @param partIdx Partition idx.
+     * @return Partition idx.
      */
-    public void partIdx(long partIdx) {
-        this.partIdx = partIdx;
-    }
-
     public long partIdx() {
         return partIdx;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 516b7bd..217073a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -882,8 +882,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
-        assert false;
-
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index ba6ff5c..1dc68cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,18 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -58,6 +46,19 @@ import org.jetbrains.annotations.NotNull;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.LongAdder8;
 
+import javax.cache.CacheException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicStampedReference;
+import java.util.concurrent.locks.ReentrantLock;
+
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
@@ -111,7 +112,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     private final LongAdder8 mapPubSize = new LongAdder8();
 
     /** Remove queue. */
-    private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
+    private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
 
     /** Group reservations. */
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
@@ -144,8 +145,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
             Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
 
-        if (cctx.deferredDelete())
-            rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
+        rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
     }
 
     /**
@@ -299,8 +299,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @throws IgniteCheckedException If failed.
      */
     public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException {
-        assert cctx.deferredDelete();
-
         try {
             T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver));
 
@@ -502,8 +500,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
             ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
 
-            if (cctx.deferredDelete())
-                clearDeferredDeletes();
+            clearDeferredDeletes();
 
             return new GridFinishedFuture<>(true);
         }
@@ -556,8 +553,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
             ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
 
-            if (cctx.deferredDelete())
-                clearDeferredDeletes();
+            clearDeferredDeletes();
 
             return true;
         }
@@ -800,8 +796,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      *
      */
     private void clearDeferredDeletes() {
-        assert cctx.deferredDelete();
-
         rmvQueue.forEach(new CI1<T2<KeyCacheObject, GridCacheVersion>>() {
             @Override public void apply(T2<KeyCacheObject, GridCacheVersion> t) {
                 cctx.dht().removeVersionedEntry(t.get1(), t.get2());

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 4616b17..1195ddd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -229,7 +229,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             topReadyFut = exchFut;
 
-            rebalancedTopVer = AffinityTopologyVersion.NONE;;
+            rebalancedTopVer = AffinityTopologyVersion.NONE;
         }
         finally {
             lock.writeLock().unlock();
@@ -1339,13 +1339,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
-        X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
+        X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
 
         for (GridDhtLocalPartition part : locParts.values()) {
             int size = part.size();
 
             if (size >= threshold)
-                X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']');
+                X.println(">>>   Local partition [part=" + part.id() + ", size=" + size + ']');
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8eabae1..46799d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1808,10 +1808,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         else if (conflictCtx.isMerge())
                             newConflictVer = null; // Conflict version is discarded in case of merge.
 
+                        EntryProcessor<Object, Object, Object> entryProcessor = null;
+
                         if (!readersOnly) {
                             dhtFut.addWriteEntry(entry,
                                 updRes.newValue(),
-                                op == TRANSFORM ? req.entryProcessor(i) : null,
+                                entryProcessor,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime(),
                                 newConflictVer,
@@ -1824,6 +1826,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 updRes.newValue(),
+                                entryProcessor,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime());
                     }
@@ -2094,10 +2097,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
+                        EntryProcessor<Object, Object, Object> entryProcessor =
+                            entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
+
                         if (!batchRes.readersOnly())
                             dhtFut.addWriteEntry(entry,
                                 writeVal,
-                                entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()),
+                                entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE,
                                 null,
@@ -2109,6 +2115,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 writeVal,
+                                entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
                     }
@@ -2513,6 +2520,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         CacheObject val = req.value(i);
                         CacheObject prevVal = req.previousValue(i);
+
                         EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
                         Long updateIdx = req.updateIdx(i);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 0d2f580..169e6a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -133,9 +133,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
 
         waitForExchange = !topLocked;
-
-        // We can send entry processor instead of value to backup if updates are ordered.
-        forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM;
     }
 
     /** {@inheritDoc} */
@@ -266,12 +263,14 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param readers Entry readers.
      * @param entry Entry.
      * @param val Value.
+     * @param entryProcessor Entry processor..
      * @param ttl TTL for near cache update (optional).
      * @param expireTime Expire time for near cache update (optional).
      */
     public void addNearWriteEntries(Iterable<UUID> readers,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime) {
         CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
@@ -313,6 +312,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
             updateReq.addNearWriteValue(entry.key(),
                 val,
+                entryProcessor,
                 ttl,
                 expireTime);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 4d27bfd..0f29a90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -298,21 +298,36 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /**
      * @param key Key to add.
      * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
      * @param ttl TTL.
      * @param expireTime Expire time.
      */
     public void addNearWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime)
     {
         if (nearKeys == null) {
             nearKeys = new ArrayList<>();
-            nearVals = new ArrayList<>();
+
+            if (forceTransformBackups) {
+                nearEntryProcessors = new ArrayList<>();
+                nearEntryProcessorsBytes = new ArrayList<>();
+            }
+            else
+                nearVals = new ArrayList<>();
         }
 
         nearKeys.add(key);
-        nearVals.add(val);
+
+        if (forceTransformBackups) {
+            assert entryProcessor != null;
+
+            nearEntryProcessors.add(entryProcessor);
+        }
+        else
+            nearVals.add(val);
 
         if (ttl >= 0) {
             if (nearTtls == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index f8bb8fb..4f2caa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -361,7 +361,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             expireTime,
                             null,
                             false,
-                            /*intercept*/false,
+                            intercept,
                             req.subjectId(),
                             taskName,
                             null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index eefbbae..d96c824 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -143,6 +143,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
+     * @return Cache ID.
+     */
+    int cacheId() {
+        return cacheId;
+    }
+
+    /**
      * @return Event type.
      */
     EventType eventType() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 14c1b8d..8e20fbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 2f9e111..4937ee7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -38,9 +38,7 @@ interface CacheContinuousQueryListener<K, V> {
      * @param primary Primary flag.
      * @param recordIgniteEvt Whether to record event.
      */
-    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
-        boolean primary,
-        boolean recordIgniteEvt);
+    public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt);
 
     /**
      * Listener unregistered callback.

http://git-wip-us.apache.org/repos/asf/ignite/blob/785539be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 65bb670..14fe195 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -183,7 +183,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     {
         assert e != null;
         assert key != null;
-        assert Thread.holdsLock(e) : e;
 
         boolean internal = e.isInternal() || !e.context().userCache();
 
@@ -661,7 +660,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             CacheEntryEventFilter fltr = null;
 
             if (cfg.getCacheEntryEventFilterFactory() != null) {
-                fltr = (CacheEntryEventFilter) cfg.getCacheEntryEventFilterFactory().create();
+                fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
 
                 if (!(fltr instanceof Serializable))
                     throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "