You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/29 17:58:53 UTC

[01/10] incubator-ignite git commit: ignite-37 Improve offheap metrics for cache

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-943 d10fe3e90 -> d10120d67


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
new file mode 100644
index 0000000..3d44600
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.local;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.swapspace.file.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class CacheLocalOffHeapAndSwapMetricsSelfTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 1;
+
+    /** Keys count. */
+    private static final int KEYS_CNT = 1000;
+
+    /** Max size. */
+    private static final int MAX_SIZE = 100;
+
+    /** Entry size. */
+    private static final int ENTRY_SIZE = 86; // Calculated as allocated size divided on entries count.
+
+    /** Offheap max count. */
+    private static final int OFFHEAP_MAX_CNT = KEYS_CNT / 2;
+
+    /** Offheap max size. */
+    private static final int OFFHEAP_MAX_SIZE = ENTRY_SIZE * OFFHEAP_MAX_CNT;
+
+    /** Cache. */
+    private IgniteCache<Integer, Integer> cache;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+
+        return cfg;
+    }
+
+    /**
+     * @param offHeapSize Max off-heap size.
+     * @param swapEnabled Swap enabled.
+     */
+    private void createCache(int offHeapSize, boolean swapEnabled) {
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setStatisticsEnabled(true);
+
+        ccfg.setCacheMode(CacheMode.LOCAL);
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
+
+        ccfg.setOffHeapMaxMemory(offHeapSize);
+        ccfg.setSwapEnabled(swapEnabled);
+
+        ccfg.setEvictionPolicy(new FifoEvictionPolicy(MAX_SIZE));
+
+        cache = grid(0).getOrCreateCache(ccfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (cache != null)
+            cache.close();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testOffHeapMetrics() throws Exception {
+        createCache(0, false);
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.put(i, i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapGets());
+        assertEquals(0, cache.metrics().getOffHeapHits());
+        assertEquals(0f, cache.metrics().getOffHeapHitPercentage());
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapMisses());
+        assertEquals(100f, cache.metrics().getOffHeapMissPercentage());
+        assertEquals(0, cache.metrics().getOffHeapRemovals());
+
+        assertEquals(0, cache.metrics().getOffHeapEvictions());
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapEntriesCount());
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPrimaryEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.get(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+        assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapGets());
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapHits());
+        assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getOffHeapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapMisses());
+        assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getOffHeapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapRemovals());
+
+        assertEquals(0, cache.metrics().getOffHeapEvictions());
+        assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getOffHeapEntriesCount());
+        assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getOffHeapPrimaryEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+        for (int i = KEYS_CNT; i < KEYS_CNT * 2; i++)
+            cache.get(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+        assertEquals(KEYS_CNT * 3, cache.metrics().getOffHeapGets());
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapHits());
+        assertEquals(100 / 3.0, cache.metrics().getOffHeapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapMisses());
+        assertEquals(100 - (100 / 3.0), cache.metrics().getOffHeapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapRemovals());
+
+        assertEquals(0, cache.metrics().getOffHeapEvictions());
+        assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getOffHeapEntriesCount());
+        assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getOffHeapPrimaryEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.remove(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+        assertEquals(KEYS_CNT * 4 - MAX_SIZE, cache.metrics().getOffHeapGets());
+        assertEquals(KEYS_CNT * 2 - MAX_SIZE, cache.metrics().getOffHeapHits());
+        assertEquals(100 * (KEYS_CNT * 2.0 - MAX_SIZE) / (KEYS_CNT * 4.0 - MAX_SIZE),
+            cache.metrics().getOffHeapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapMisses());
+        assertEquals(100 * KEYS_CNT * 2.0 / (KEYS_CNT * 4.0 - MAX_SIZE),
+            cache.metrics().getOffHeapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2 - MAX_SIZE, cache.metrics().getOffHeapRemovals());
+
+        assertEquals(0, cache.metrics().getOffHeapEvictions());
+        assertEquals(0, cache.metrics().getOffHeapEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapPrimaryEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testSwapMetrics() throws Exception {
+        createCache(-1, true);
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.put(i, i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapPuts());
+        assertEquals(KEYS_CNT, cache.metrics().getSwapGets());
+        assertEquals(0, cache.metrics().getSwapHits());
+        assertEquals(0f, cache.metrics().getSwapHitPercentage());
+        assertEquals(KEYS_CNT, cache.metrics().getSwapMisses());
+        assertEquals(100f, cache.metrics().getSwapMissPercentage());
+        assertEquals(0, cache.metrics().getSwapRemovals());
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapEntriesCount());
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.get(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapPuts());
+        assertEquals(KEYS_CNT * 2, cache.metrics().getSwapGets());
+        assertEquals(KEYS_CNT, cache.metrics().getSwapHits());
+        assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getSwapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getSwapMisses());
+        assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getSwapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getSwapRemovals());
+
+        assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getSwapEntriesCount());
+
+        for (int i = KEYS_CNT; i < KEYS_CNT * 2; i++)
+            cache.get(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapPuts());
+        assertEquals(KEYS_CNT * 3, cache.metrics().getSwapGets());
+        assertEquals(KEYS_CNT, cache.metrics().getSwapHits());
+        assertEquals(100 / 3.0, cache.metrics().getSwapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2, cache.metrics().getSwapMisses());
+        assertEquals(100 - (100 / 3.0), cache.metrics().getSwapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getSwapRemovals());
+
+        assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getSwapEntriesCount());
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.remove(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapPuts());
+        assertEquals(KEYS_CNT * 4 - MAX_SIZE, cache.metrics().getSwapGets());
+        assertEquals(KEYS_CNT * 2 - MAX_SIZE, cache.metrics().getSwapHits());
+        assertEquals(100 * (KEYS_CNT * 2.0 - MAX_SIZE) / (KEYS_CNT * 4.0 - MAX_SIZE),
+            cache.metrics().getSwapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2, cache.metrics().getSwapMisses());
+        assertEquals(100 * KEYS_CNT * 2.0 / (KEYS_CNT * 4.0 - MAX_SIZE),
+            cache.metrics().getSwapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2 - MAX_SIZE, cache.metrics().getSwapRemovals());
+
+        assertEquals(0, cache.metrics().getSwapEntriesCount());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testOffHeapAndSwapMetrics() throws Exception {
+        createCache(OFFHEAP_MAX_SIZE, true);
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.put(i, i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapGets());
+        assertEquals(0, cache.metrics().getOffHeapHits());
+        assertEquals(0f, cache.metrics().getOffHeapHitPercentage());
+        assertEquals(KEYS_CNT, cache.metrics().getOffHeapMisses());
+        assertEquals(100f, cache.metrics().getOffHeapMissPercentage());
+        assertEquals(0, cache.metrics().getOffHeapRemovals());
+
+        assertEquals(KEYS_CNT - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEvictions());
+        assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEntriesCount());
+        assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapPrimaryEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+        assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapPuts());
+        assertEquals(KEYS_CNT, cache.metrics().getSwapGets());
+        assertEquals(0, cache.metrics().getSwapHits());
+        assertEquals(0f, cache.metrics().getSwapHitPercentage());
+        assertEquals(KEYS_CNT, cache.metrics().getSwapMisses());
+        assertEquals(100f, cache.metrics().getSwapMissPercentage());
+        assertEquals(0, cache.metrics().getSwapRemovals());
+
+        assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapEntriesCount());
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.get(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+        assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapGets());
+        assertEquals(0, cache.metrics().getOffHeapHits());
+        assertEquals(0.0, cache.metrics().getOffHeapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapMisses());
+        assertEquals(100.0, cache.metrics().getOffHeapMissPercentage(), 0.1);
+        assertEquals(0, cache.metrics().getOffHeapRemovals());
+
+        assertEquals(cache.metrics().getCacheEvictions() - OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEvictions());
+        assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEntriesCount());
+        assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapPrimaryEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+        assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapPuts());
+        assertEquals(KEYS_CNT * 2, cache.metrics().getSwapGets());
+        assertEquals(KEYS_CNT, cache.metrics().getSwapHits());
+        assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getSwapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getSwapMisses());
+        assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getSwapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getSwapRemovals());
+
+        assertEquals(KEYS_CNT - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapEntriesCount());
+
+        for (int i = KEYS_CNT; i < KEYS_CNT * 2; i++)
+            cache.get(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+        assertEquals(KEYS_CNT * 3, cache.metrics().getOffHeapGets());
+        assertEquals(0, cache.metrics().getOffHeapHits());
+        assertEquals(0.0, cache.metrics().getOffHeapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 3, cache.metrics().getOffHeapMisses());
+        assertEquals(100.0, cache.metrics().getOffHeapMissPercentage(), 0.1);
+        assertEquals(0, cache.metrics().getOffHeapRemovals());
+
+        assertEquals(cache.metrics().getCacheEvictions() - OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEvictions());
+        assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEntriesCount());
+        assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapPrimaryEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+        assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapPuts());
+        assertEquals(KEYS_CNT * 3, cache.metrics().getSwapGets());
+        assertEquals(KEYS_CNT, cache.metrics().getSwapHits());
+        assertEquals(100 / 3.0, cache.metrics().getSwapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2, cache.metrics().getSwapMisses());
+        assertEquals(100 - (100 / 3.0), cache.metrics().getSwapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT, cache.metrics().getSwapRemovals());
+
+        assertEquals(KEYS_CNT - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapEntriesCount());
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.remove(i);
+
+        printStat();
+
+        assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+        assertEquals(KEYS_CNT * 4 - MAX_SIZE, cache.metrics().getOffHeapGets());
+        assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapHits());
+        assertEquals(100 * OFFHEAP_MAX_CNT / (KEYS_CNT * 4.0 - MAX_SIZE),
+            cache.metrics().getOffHeapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 4 - OFFHEAP_MAX_CNT - MAX_SIZE, cache.metrics().getOffHeapMisses());
+        assertEquals(100 * (KEYS_CNT * 4 - OFFHEAP_MAX_CNT - MAX_SIZE) / (KEYS_CNT * 4.0 - MAX_SIZE),
+            cache.metrics().getOffHeapMissPercentage(), 0.1);
+        assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapRemovals());
+
+        assertEquals(cache.metrics().getCacheEvictions() - OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEvictions());
+        assertEquals(0, cache.metrics().getOffHeapEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapPrimaryEntriesCount());
+        assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+        assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapPuts());
+        assertEquals(KEYS_CNT * 4 - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapGets());
+        assertEquals(KEYS_CNT * 2 - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapHits());
+        assertEquals(100 * (KEYS_CNT * 2.0 - MAX_SIZE - OFFHEAP_MAX_CNT) / (KEYS_CNT * 4.0 - MAX_SIZE - OFFHEAP_MAX_CNT),
+            cache.metrics().getSwapHitPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2, cache.metrics().getSwapMisses());
+        assertEquals(100 * KEYS_CNT * 2.0 / (KEYS_CNT * 4.0 - MAX_SIZE - OFFHEAP_MAX_CNT),
+            cache.metrics().getSwapMissPercentage(), 0.1);
+        assertEquals(KEYS_CNT * 2 - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapRemovals());
+
+        assertEquals(0, cache.metrics().getSwapEntriesCount());
+    }
+
+    /**
+     * Prints stats.
+     */
+    protected void printStat() {
+        System.out.println("!!! -------------------------------------------------------");
+        System.out.println("!!! Puts: cache = " + cache.metrics().getCachePuts() +
+            ", offheap = " + cache.metrics().getOffHeapPuts() +
+            ", swap = " + cache.metrics().getSwapPuts());
+        System.out.println("!!! Gets: cache = " + cache.metrics().getCacheGets() +
+            ", offheap = " + cache.metrics().getOffHeapGets() +
+            ", swap = " + cache.metrics().getSwapGets());
+        System.out.println("!!! Removes: cache = " + cache.metrics().getCacheRemovals() +
+            ", offheap = " + cache.metrics().getOffHeapRemovals() +
+            ", swap = " + cache.metrics().getSwapRemovals());
+        System.out.println("!!! Evictions: cache = " + cache.metrics().getCacheEvictions() +
+            ", offheap = " + cache.metrics().getOffHeapEvictions() +
+            ", swap = none" );
+        System.out.println("!!! Hits: cache = " + cache.metrics().getCacheHits() +
+            ", offheap = " + cache.metrics().getOffHeapHits() +
+            ", swap = " + cache.metrics().getSwapHits());
+        System.out.println("!!! Hit(%): cache = " + cache.metrics().getCacheHitPercentage() +
+            ", offheap = " + cache.metrics().getOffHeapHitPercentage() +
+            ", swap = " + cache.metrics().getSwapHitPercentage());
+        System.out.println("!!! Misses: cache = " + cache.metrics().getCacheMisses() +
+            ", offheap = " + cache.metrics().getOffHeapMisses() +
+            ", swap = " + cache.metrics().getSwapMisses());
+        System.out.println("!!! Miss(%): cache = " + cache.metrics().getCacheMissPercentage() +
+            ", offheap = " + cache.metrics().getOffHeapMissPercentage() +
+            ", swap = " + cache.metrics().getSwapMissPercentage());
+        System.out.println("!!! Entries: cache = " + cache.metrics().getSize() +
+            ", offheap = " + cache.metrics().getOffHeapEntriesCount() +
+            ", swap = " + cache.metrics().getSwapEntriesCount());
+        System.out.println("!!! Size: cache = none" +
+            ", offheap = " + cache.metrics().getOffHeapAllocatedSize() +
+            ", swap = " + cache.metrics().getSwapSize());
+        System.out.println();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index bc04f90..5867fb8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,7 +29,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.swapspace.*;
+
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -447,28 +447,11 @@ public class GridSpiTestContext implements IgniteSpiContext {
     }
 
     /** {@inheritDoc} */
-    @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
-        @Nullable ClassLoader ldr) {
-        /* No-op. */
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public int partition(String cacheName, Object key) {
         return -1;
     }
 
     /** {@inheritDoc} */
-    @Override public void removeFromSwap(String spaceName, Object key,
-        @Nullable ClassLoader ldr) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
         return null;
     }
@@ -484,12 +467,6 @@ public class GridSpiTestContext implements IgniteSpiContext {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
-        @Nullable ClassLoader ldr) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public MessageFormatter messageFormatter() {
         if (formatter == null) {
             formatter = new MessageFormatter() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
index 1adf55f..9a0e5fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
@@ -39,6 +39,7 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheReplicatedMetricsSelfTest.class);
         suite.addTestSuite(GridCachePartitionedMetricsSelfTest.class);
         suite.addTestSuite(GridCachePartitionedHitsAndMissesSelfTest.class);
+        suite.addTestSuite(CacheLocalOffHeapAndSwapMetricsSelfTest.class);
 
         // Atomic cache.
         suite.addTestSuite(GridCacheAtomicLocalMetricsSelfTest.class);


[09/10] incubator-ignite git commit: # regenerated PDF

Posted by se...@apache.org.
# regenerated PDF


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5c30f9cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5c30f9cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5c30f9cf

Branch: refs/heads/ignite-943
Commit: 5c30f9cf5c490b0d6c065e89557a8b8f3040eda5
Parents: 6cd1a6e 3f012b7
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 29 18:42:16 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 29 18:42:16 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/CacheMetrics.java   | 187 +++++++--
 .../internal/managers/GridManagerAdapter.java   |  59 +--
 .../processors/cache/CacheMetricsImpl.java      | 305 +++++++++++++-
 .../cache/CacheMetricsMXBeanImpl.java           | 100 +++++
 .../processors/cache/CacheMetricsSnapshot.java  | 380 +++++++++++++----
 .../processors/cache/GridCacheSwapManager.java  | 118 ++++--
 .../ignite/mxbean/CacheMetricsMXBean.java       |  80 ++++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  35 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  47 ---
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |   8 +-
 ...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 412 +++++++++++++++++++
 .../testframework/GridSpiTestContext.java       |  25 +-
 .../IgniteCacheMetricsSelfTestSuite.java        |   1 +
 13 files changed, 1457 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5c30f9cf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index af19077,3dcda3c..4e6a447
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@@ -129,24 -161,162 +164,168 @@@ public class CacheMetricsImpl implement
      }
  
      /** {@inheritDoc} */
+     @Override public long getOffHeapGets() {
+         return offHeapGets.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getOffHeapPuts() {
+         return offHeapPuts.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getOffHeapRemovals() {
+         return offHeapRemoves.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getOffHeapEvictions() {
+         return offHeapEvicts.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getOffHeapHits() {
+         return offHeapHits.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public float getOffHeapHitPercentage() {
+         long hits0 = offHeapHits.get();
+         long gets0 = offHeapGets.get();
+ 
+         if (hits0 == 0)
+             return 0;
+ 
+         return (float) hits0 / gets0 * 100.0f;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getOffHeapMisses() {
+         return offHeapMisses.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public float getOffHeapMissPercentage() {
+         long misses0 = offHeapMisses.get();
+         long reads0 = offHeapGets.get();
+ 
+         if (misses0 == 0)
+             return 0;
+ 
+         return (float) misses0 / reads0 * 100.0f;
+     }
+ 
+     /** {@inheritDoc} */
      @Override public long getOffHeapEntriesCount() {
 -        return cctx.cache().offHeapEntriesCount();
 +        GridCacheAdapter<?, ?> cache = cctx.cache();
 +
 +        return cache != null ? cache.offHeapEntriesCount() : -1;
      }
  
      /** {@inheritDoc} */
+     @Override public long getOffHeapPrimaryEntriesCount() {
+         try {
+             return cctx.swap().offheapEntriesCount(true, false, NONE);
+         }
+         catch (IgniteCheckedException e) {
+             return 0;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getOffHeapBackupEntriesCount() {
+         try {
+             return cctx.swap().offheapEntriesCount(false, true, NONE);
+         }
+         catch (IgniteCheckedException e) {
+             return 0;
+         }
+     }
+ 
+     /** {@inheritDoc} */
      @Override public long getOffHeapAllocatedSize() {
 -        return cctx.cache().offHeapAllocatedSize();
 +        GridCacheAdapter<?, ?> cache = cctx.cache();
 +
 +        return cache != null ? cache.offHeapAllocatedSize() : -1;
      }
  
      /** {@inheritDoc} */
+     @Override public long getOffHeapMaxSize() {
+         return cctx.config().getOffHeapMaxMemory();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getSwapGets() {
+         return swapGets.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getSwapPuts() {
+         return swapPuts.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getSwapRemovals() {
+         return swapRemoves.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getSwapHits() {
+         return swapHits.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getSwapMisses() {
+         return swapMisses.get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getSwapEntriesCount() {
+         try {
+             return cctx.cache().swapKeys();
+         }
+         catch (IgniteCheckedException e) {
+             return 0;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getSwapSize() {
+         try {
+             return cctx.cache().swapSize();
+         }
+         catch (IgniteCheckedException e) {
+             return 0;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public float getSwapHitPercentage() {
+         long hits0 = swapHits.get();
+         long gets0 = swapGets.get();
+ 
+         if (hits0 == 0)
+             return 0;
+ 
+         return (float) hits0 / gets0 * 100.0f;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public float getSwapMissPercentage() {
+         long misses0 = swapMisses.get();
+         long reads0 = swapGets.get();
+ 
+         if (misses0 == 0)
+             return 0;
+ 
+         return (float) misses0 / reads0 * 100.0f;
+     }
+ 
+     /** {@inheritDoc} */
      @Override public int getSize() {
 -        return cctx.cache().size();
 +        GridCacheAdapter<?, ?> cache = cctx.cache();
 +
 +        return cache != null ? cache.size() : 0;
      }
  
      /** {@inheritDoc} */
@@@ -606,11 -769,111 +797,113 @@@
  
      /** {@inheritDoc} */
      @Override public boolean isManagementEnabled() {
 -        return cctx.config().isManagementEnabled();
 +        CacheConfiguration ccfg = cctx.config();
 +
 +        return ccfg != null && ccfg.isManagementEnabled();
      }
  
+     /**
+      * Off-heap read callback.
+      *
+      * @param hit Hit or miss flag.
+      */
+     public void onOffHeapRead(boolean hit) {
+         offHeapGets.incrementAndGet();
+ 
+         if (hit)
+             offHeapHits.incrementAndGet();
+         else
+             offHeapMisses.incrementAndGet();
+ 
+         if (delegate != null)
+             delegate.onOffHeapRead(hit);
+     }
+ 
+     /**
+      * Off-heap write callback.
+      */
+     public void onOffHeapWrite() {
+         offHeapPuts.incrementAndGet();
+ 
+         if (delegate != null)
+             delegate.onOffHeapWrite();
+     }
+ 
+     /**
+      * Off-heap remove callback.
+      */
+     public void onOffHeapRemove() {
+         offHeapRemoves.incrementAndGet();
+ 
+         if (delegate != null)
+             delegate.onOffHeapRemove();
+     }
+ 
+     /**
+      * Off-heap evict callback.
+      */
+     public void onOffHeapEvict() {
+         offHeapEvicts.incrementAndGet();
+ 
+         if (delegate != null)
+             delegate.onOffHeapRemove();
+     }
+ 
+     /**
+      * Swap read callback.
+      *
+      * @param hit Hit or miss flag.
+      */
+     public void onSwapRead(boolean hit) {
+         swapGets.incrementAndGet();
+ 
+         if (hit)
+             swapHits.incrementAndGet();
+         else
+             swapMisses.incrementAndGet();
+ 
+         if (delegate != null)
+             delegate.onSwapRead(hit);
+     }
+ 
+     /**
+      * Swap write callback.
+      */
+     public void onSwapWrite() {
+         onSwapWrite(1);
+     }
+ 
+     /**
+      * Swap write callback.
+      *
+      * @param cnt Amount of entries.
+      */
+     public void onSwapWrite(int cnt) {
+         swapPuts.addAndGet(cnt);
+ 
+         if (delegate != null)
+             delegate.onSwapWrite(cnt);
+     }
+ 
+     /**
+      * Swap remove callback.
+      */
+     public void onSwapRemove() {
+         onSwapRemove(1);
+     }
+ 
+     /**
+      * Swap remove callback.
+      *
+      * @param cnt Amount of entries.
+      */
+     public void onSwapRemove(int cnt) {
+         swapRemoves.addAndGet(cnt);
+ 
+         if (delegate != null)
+             delegate.onSwapRemove(cnt);
+     }
+ 
      /** {@inheritDoc} */
      @Override public String toString() {
          return S.toString(CacheMetricsImpl.class, this);


[10/10] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-943

Posted by se...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-943


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d10120d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d10120d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d10120d6

Branch: refs/heads/ignite-943
Commit: d10120d67d3d6e20b25e494b03b829b20f263b5b
Parents: d10fe3e 5c30f9c
Author: sevdokimov <se...@gridgain.com>
Authored: Fri May 29 18:57:20 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Fri May 29 18:57:20 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/CacheMetrics.java   | 187 +++++++--
 .../org/apache/ignite/igfs/IgfsUserContext.java | 119 ++++++
 .../igfs/secondary/IgfsSecondaryFileSystem.java |   7 +
 .../internal/igfs/common/IgfsMarshaller.java    |  35 +-
 .../igfs/common/IgfsPathControlRequest.java     |  22 +
 .../internal/managers/GridManagerAdapter.java   |  59 +--
 .../processors/cache/CacheMetricsImpl.java      | 367 ++++++++++++++++-
 .../cache/CacheMetricsMXBeanImpl.java           | 100 +++++
 .../processors/cache/CacheMetricsSnapshot.java  | 380 +++++++++++++----
 .../processors/cache/GridCacheAdapter.java      |  12 +-
 .../processors/cache/GridCacheSwapManager.java  | 118 ++++--
 .../internal/processors/hadoop/HadoopJob.java   |   2 +-
 .../ignite/internal/processors/igfs/IgfsEx.java |   8 +-
 .../internal/processors/igfs/IgfsImpl.java      |   8 +-
 .../processors/igfs/IgfsIpcHandler.java         | 184 +++++----
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   9 +-
 .../internal/processors/igfs/IgfsServer.java    |   4 +-
 .../internal/processors/igfs/IgfsUtils.java     |  16 +
 .../ignite/internal/util/GridJavaProcess.java   |  30 +-
 .../ignite/mxbean/CacheMetricsMXBean.java       |  80 ++++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  35 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  47 ---
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |   8 +-
 ...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 412 +++++++++++++++++++
 .../testframework/GridSpiTestContext.java       |  25 +-
 .../IgniteCacheMetricsSelfTestSuite.java        |   1 +
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 165 +++++---
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    | 107 +++--
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |  32 +-
 .../internal/processors/hadoop/HadoopUtils.java |  10 +-
 .../hadoop/SecondaryFileSystemProvider.java     |  53 ++-
 .../hadoop/fs/HadoopDistributedFileSystem.java  |  91 ----
 .../hadoop/fs/HadoopFileSystemsUtils.java       |  17 -
 .../hadoop/fs/HadoopLazyConcurrentMap.java      | 204 +++++++++
 .../processors/hadoop/igfs/HadoopIgfsEx.java    |   6 +
 .../hadoop/igfs/HadoopIgfsInProc.java           | 170 ++++++--
 .../processors/hadoop/igfs/HadoopIgfsIpcIo.java |   2 +-
 .../hadoop/igfs/HadoopIgfsOutProc.java          |  33 +-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |  19 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |   4 +-
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |  56 ++-
 ...oopSecondaryFileSystemConfigurationTest.java |   4 +-
 .../IgniteHadoopFileSystemAbstractSelfTest.java |  63 ++-
 .../IgniteHadoopFileSystemClientSelfTest.java   |   2 +-
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |   2 +
 .../hadoop/HadoopFileSystemsTest.java           |  23 +-
 .../collections/HadoopSkipListSelfTest.java     |   4 +-
 47 files changed, 2537 insertions(+), 805 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d10120d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d10120d6/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------


[06/10] incubator-ignite git commit: ignite-866 NPE during clean up

Posted by se...@apache.org.
ignite-866 NPE during clean up


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2d9a938a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2d9a938a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2d9a938a

Branch: refs/heads/ignite-943
Commit: 2d9a938a281887fdae804ac68a89e93e5ad1b02b
Parents: 8455c7a
Author: agura <ag...@gridgain.com>
Authored: Thu May 14 14:40:38 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 29 16:43:38 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheMetricsImpl.java      | 62 +++++++++++++++-----
 .../processors/cache/GridCacheAdapter.java      | 12 +++-
 2 files changed, 55 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d9a938a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 560de97..af19077 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -118,7 +119,9 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** {@inheritDoc} */
     @Override public long getOverflowSize() {
         try {
-            return cctx.cache().overflowSize();
+            GridCacheAdapter<?, ?> cache = cctx.cache();
+
+            return cache != null ? cache.overflowSize() : -1;
         }
         catch (IgniteCheckedException ignored) {
             return -1;
@@ -127,34 +130,47 @@ public class CacheMetricsImpl implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public long getOffHeapEntriesCount() {
-        return cctx.cache().offHeapEntriesCount();
+        GridCacheAdapter<?, ?> cache = cctx.cache();
+
+        return cache != null ? cache.offHeapEntriesCount() : -1;
     }
 
     /** {@inheritDoc} */
     @Override public long getOffHeapAllocatedSize() {
-        return cctx.cache().offHeapAllocatedSize();
+        GridCacheAdapter<?, ?> cache = cctx.cache();
+
+        return cache != null ? cache.offHeapAllocatedSize() : -1;
     }
 
     /** {@inheritDoc} */
     @Override public int getSize() {
-        return cctx.cache().size();
+        GridCacheAdapter<?, ?> cache = cctx.cache();
+
+        return cache != null ? cache.size() : 0;
     }
 
     /** {@inheritDoc} */
     @Override public int getKeySize() {
-        return cctx.cache().size();
+        return getSize();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isEmpty() {
-        return cctx.cache().isEmpty();
+        GridCacheAdapter<?, ?> cache = cctx.cache();
+
+        return cache == null || cache.isEmpty();
     }
 
     /** {@inheritDoc} */
     @Override public int getDhtEvictQueueCurrentSize() {
-        return cctx.isNear() ?
-                dhtCtx != null ? dhtCtx.evicts().evictQueueSize() : -1
-                : cctx.evicts().evictQueueSize();
+        GridCacheContext<?, ?> ctx = cctx.isNear() ? dhtCtx : cctx;
+
+        if (ctx == null)
+            return -1;
+
+        GridCacheEvictionManager evictMgr = ctx.evicts();
+
+        return evictMgr != null ? evictMgr.evictQueueSize() : -1;
     }
 
     /** {@inheritDoc} */
@@ -548,37 +564,51 @@ public class CacheMetricsImpl implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public String getKeyType() {
-        return cctx.config().getKeyType().getName();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null ? ccfg.getKeyType().getName() : null;
     }
 
     /** {@inheritDoc} */
     @Override public String getValueType() {
-        return cctx.config().getValueType().getName();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null ? ccfg.getValueType().getName() : null;
     }
 
     /** {@inheritDoc} */
     @Override public boolean isReadThrough() {
-        return cctx.config().isReadThrough();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isReadThrough();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isWriteThrough() {
-        return cctx.config().isWriteThrough();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isWriteThrough();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isStoreByValue() {
-        return cctx.config().isStoreByValue();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isStoreByValue();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isStatisticsEnabled() {
-        return cctx.config().isStatisticsEnabled();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isStatisticsEnabled();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isManagementEnabled() {
-        return cctx.config().isManagementEnabled();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isManagementEnabled();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d9a938a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d390037..c975961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3249,7 +3249,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public long overflowSize() throws IgniteCheckedException {
-        return ctx.swap().swapSize();
+        GridCacheSwapManager swapMgr = ctx.swap();
+
+        return swapMgr != null ? swapMgr.swapSize() : -1;
     }
 
     /**
@@ -3802,12 +3804,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public long offHeapEntriesCount() {
-        return ctx.swap().offHeapEntriesCount();
+        GridCacheSwapManager swapMgr = ctx.swap();
+
+        return swapMgr != null ? swapMgr.offHeapEntriesCount() : -1;
     }
 
     /** {@inheritDoc} */
     @Override public long offHeapAllocatedSize() {
-        return ctx.swap().offHeapAllocatedSize();
+        GridCacheSwapManager swapMgr = ctx.swap();
+
+        return swapMgr != null ? swapMgr.offHeapAllocatedSize() : -1;
     }
 
     /** {@inheritDoc} */


[08/10] incubator-ignite git commit: Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-sprint-5

Posted by se...@apache.org.
Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6cd1a6e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6cd1a6e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6cd1a6e5

Branch: refs/heads/ignite-943
Commit: 6cd1a6e55880571e09785afc824e258070242284
Parents: 2d9a938 5df0668
Author: agura <ag...@gridgain.com>
Authored: Fri May 29 16:59:49 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 29 16:59:49 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/GridJavaProcess.java   | 30 ++++++++++++--------
 1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[04/10] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.

Posted by se...@apache.org.
[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35388195
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35388195
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35388195

Branch: refs/heads/ignite-943
Commit: 353881951fcdcc16c3dc31d808d3af6c263f74ce
Parents: 7ec4c82
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 15:31:35 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 15:31:35 2015 +0300

----------------------------------------------------------------------
 .../igfs/secondary/IgfsSecondaryFileSystem.java |   7 +
 .../internal/igfs/common/IgfsMarshaller.java    |  35 +---
 .../igfs/common/IgfsPathControlRequest.java     |  22 +++
 .../internal/processors/hadoop/HadoopJob.java   |   2 +-
 .../ignite/internal/processors/igfs/IgfsEx.java |   8 +-
 .../internal/processors/igfs/IgfsImpl.java      |   8 +-
 .../processors/igfs/IgfsIpcHandler.java         | 184 ++++++++++---------
 .../igfs/IgfsSecondaryFileSystemImpl.java       |   9 +-
 .../internal/processors/igfs/IgfsServer.java    |   4 +-
 .../internal/processors/igfs/IgfsUtils.java     |  16 ++
 .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 165 ++++++++++++-----
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    | 107 +++++++----
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |  32 +++-
 .../internal/processors/hadoop/HadoopUtils.java |  10 +-
 .../hadoop/SecondaryFileSystemProvider.java     |  53 +++---
 .../hadoop/fs/HadoopDistributedFileSystem.java  |  91 ---------
 .../hadoop/fs/HadoopFileSystemsUtils.java       |  17 --
 .../processors/hadoop/igfs/HadoopIgfsEx.java    |   6 +
 .../hadoop/igfs/HadoopIgfsInProc.java           | 170 ++++++++++++-----
 .../processors/hadoop/igfs/HadoopIgfsIpcIo.java |   2 +-
 .../hadoop/igfs/HadoopIgfsOutProc.java          |  33 +++-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |  19 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |   4 +-
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |  56 ++++--
 ...oopSecondaryFileSystemConfigurationTest.java |   4 +-
 .../IgniteHadoopFileSystemAbstractSelfTest.java |  63 +++++--
 .../IgniteHadoopFileSystemClientSelfTest.java   |   2 +-
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |   2 +
 .../hadoop/HadoopFileSystemsTest.java           |  23 +--
 .../collections/HadoopSkipListSelfTest.java     |   4 +-
 30 files changed, 684 insertions(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 9026eac..cb69352 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem {
      * @return Map of properties.
      */
     public Map<String,String> properties();
+
+
+    /**
+     * Closes the secondary file system.
+     * @throws IgniteException in case of an error.
+     */
+    public void close() throws IgniteException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index 11af716..6a6f22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -73,6 +73,7 @@ public class IgfsMarshaller {
     }
 
     /**
+     * Serializes the message and sends it into the given output stream.
      * @param msg Message.
      * @param hdr Message header.
      * @param out Output.
@@ -119,6 +120,7 @@ public class IgfsMarshaller {
 
                     IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
+                    U.writeString(out, req.userName());
                     writePath(out, req.path());
                     writePath(out, req.destinationPath());
                     out.writeBoolean(req.flag());
@@ -236,6 +238,7 @@ public class IgfsMarshaller {
                 case OPEN_CREATE: {
                     IgfsPathControlRequest req = new IgfsPathControlRequest();
 
+                    req.userName(U.readString(in));
                     req.path(readPath(in));
                     req.destinationPath(readPath(in));
                     req.flag(in.readBoolean());
@@ -298,8 +301,6 @@ public class IgfsMarshaller {
                 }
             }
 
-            assert msg != null;
-
             msg.command(cmd);
 
             return msg;
@@ -341,34 +342,4 @@ public class IgfsMarshaller {
 
         return null;
     }
-
-    /**
-     * Writes string to output.
-     *
-     * @param out Data output.
-     * @param str String.
-     * @throws IOException If write failed.
-     */
-    private void writeString(DataOutput out, @Nullable String str) throws IOException {
-        out.writeBoolean(str != null);
-
-        if (str != null)
-            out.writeUTF(str);
-    }
-
-    /**
-     * Reads string from input.
-     *
-     * @param in Data input.
-     * @return Read string.
-     * @throws IOException If read failed.
-     */
-    @Nullable private String readString(DataInput in) throws IOException {
-        boolean hasStr = in.readBoolean();
-
-        if (hasStr)
-            return in.readUTF();
-
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index 7ed1619..2f6e6e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.igfs.common;
 
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
@@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage {
     /** Last modification time. */
     private long modificationTime;
 
+    /** The user name this control request is made on behalf of. */
+    private String userName;
+
     /**
      * @param path Path.
      */
@@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage {
     @Override public String toString() {
         return S.toString(IgfsPathControlRequest.class, this, "cmd", command());
     }
+
+    /**
+     * Getter for the user name.
+     * @return user name.
+     */
+    public final String userName() {
+        assert userName != null;
+
+        return userName;
+    }
+
+    /**
+     * Setter for the user name.
+     * @param userName the user name.
+     */
+    public final void userName(String userName) {
+        this.userName = IgfsUtils.fixUserName(userName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
index 65cb48d..5fd6c81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
@@ -98,5 +98,5 @@ public interface HadoopJob {
     /**
      * Cleans up the job staging directory.
      */
-    void cleanupStagingDirectory();
+    public void cleanupStagingDirectory();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 7c1a837..361f75f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem {
     /** Property name for URI of file system. */
     public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
 
-    /** Property name for user name of file system. */
-    public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
+    /** Property name for default user name of file system.
+     * NOTE: for secondary file system this is just a default user name, which is used
+     * when the 2ndary filesystem is used outside of any user context.
+     * If another user name is set in the context, 2ndary file system will work on behalf
+     * of that user, which is different from the default. */
+     public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
 
     /**
      * Stops IGFS cleaning all used resources.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 34636d2..c3495e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -245,8 +245,12 @@ public final class IgfsImpl implements IgfsEx {
             for (IgfsFileWorkerBatch batch : workerMap.values())
                 batch.cancel();
 
-            if (secondaryFs instanceof AutoCloseable)
-                U.closeQuiet((AutoCloseable)secondaryFs);
+            try {
+                secondaryFs.close();
+            }
+            catch (Exception e) {
+                log.error("Failed to close secondary file system.", e);
+            }
         }
 
         igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 8a8b858..cfe6ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
     private final int bufSize; // Buffer size. Must not be less then file block size.
 
     /** IGFS instance for this handler. */
-    private IgfsEx igfs;
+    private final IgfsEx igfs;
 
     /** Resource ID generator. */
-    private AtomicLong rsrcIdGen = new AtomicLong();
+    private final AtomicLong rsrcIdGen = new AtomicLong();
 
     /** Stopping flag. */
     private volatile boolean stopping;
@@ -241,138 +241,148 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Response message.
      * @throws IgniteCheckedException If failed.
      */
-    private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd,
+    private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd,
         IgfsMessage msg) throws IgniteCheckedException {
-        IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+        final IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
         if (log.isDebugEnabled())
             log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']');
 
-        IgfsControlResponse res = new IgfsControlResponse();
+        final IgfsControlResponse res = new IgfsControlResponse();
+
+        final String userName = req.userName();
+
+        assert userName != null;
 
         try {
-            switch (cmd) {
-                case EXISTS:
-                    res.response(igfs.exists(req.path()));
+            IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() {
+                @Override public Void apply() {
+                    switch (cmd) {
+                        case EXISTS:
+                            res.response(igfs.exists(req.path()));
 
-                    break;
+                            break;
 
-                case INFO:
-                    res.response(igfs.info(req.path()));
+                        case INFO:
+                            res.response(igfs.info(req.path()));
 
-                    break;
+                            break;
 
-                case PATH_SUMMARY:
-                    res.response(igfs.summary(req.path()));
+                        case PATH_SUMMARY:
+                            res.response(igfs.summary(req.path()));
 
-                    break;
+                            break;
 
-                case UPDATE:
-                    res.response(igfs.update(req.path(), req.properties()));
+                        case UPDATE:
+                            res.response(igfs.update(req.path(), req.properties()));
 
-                    break;
+                            break;
 
-                case RENAME:
-                    igfs.rename(req.path(), req.destinationPath());
+                        case RENAME:
+                            igfs.rename(req.path(), req.destinationPath());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case DELETE:
-                    res.response(igfs.delete(req.path(), req.flag()));
+                        case DELETE:
+                            res.response(igfs.delete(req.path(), req.flag()));
 
-                    break;
+                            break;
 
-                case MAKE_DIRECTORIES:
-                    igfs.mkdirs(req.path(), req.properties());
+                        case MAKE_DIRECTORIES:
+                            igfs.mkdirs(req.path(), req.properties());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case LIST_PATHS:
-                    res.paths(igfs.listPaths(req.path()));
+                        case LIST_PATHS:
+                            res.paths(igfs.listPaths(req.path()));
 
-                    break;
+                            break;
 
-                case LIST_FILES:
-                    res.files(igfs.listFiles(req.path()));
+                        case LIST_FILES:
+                            res.files(igfs.listFiles(req.path()));
 
-                    break;
+                            break;
 
-                case SET_TIMES:
-                    igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+                        case SET_TIMES:
+                            igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case AFFINITY:
-                    res.locations(igfs.affinity(req.path(), req.start(), req.length()));
+                        case AFFINITY:
+                            res.locations(igfs.affinity(req.path(), req.start(), req.length()));
 
-                    break;
+                            break;
 
-                case OPEN_READ: {
-                    IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
-                        igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
+                        case OPEN_READ: {
+                            IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
+                                igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
 
-                    long streamId = registerResource(ses, igfsIn);
+                            long streamId = registerResource(ses, igfsIn);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
-                        igfsIn.fileInfo().modificationTime());
+                            IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
+                                igfsIn.fileInfo().modificationTime());
 
-                    res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
+                            res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
 
-                    break;
-                }
+                            break;
+                        }
 
-                case OPEN_CREATE: {
-                    long streamId = registerResource(ses, igfs.create(
-                        req.path(),       // Path.
-                        bufSize,          // Buffer size.
-                        req.flag(),       // Overwrite if exists.
-                        affinityKey(req), // Affinity key based on replication factor.
-                        req.replication(),// Replication factor.
-                        req.blockSize(),  // Block size.
-                        req.properties()  // File properties.
-                    ));
+                        case OPEN_CREATE: {
+                            long streamId = registerResource(ses, igfs.create(
+                                req.path(),       // Path.
+                                bufSize,          // Buffer size.
+                                req.flag(),       // Overwrite if exists.
+                                affinityKey(req), // Affinity key based on replication factor.
+                                req.replication(),// Replication factor.
+                                req.blockSize(),  // Block size.
+                                req.properties()  // File properties.
+                            ));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    res.response(streamId);
+                            res.response(streamId);
 
-                    break;
-                }
+                            break;
+                        }
 
-                case OPEN_APPEND: {
-                    long streamId = registerResource(ses, igfs.append(
-                        req.path(),        // Path.
-                        bufSize,           // Buffer size.
-                        req.flag(),        // Create if absent.
-                        req.properties()   // File properties.
-                    ));
+                        case OPEN_APPEND: {
+                            long streamId = registerResource(ses, igfs.append(
+                                req.path(),        // Path.
+                                bufSize,           // Buffer size.
+                                req.flag(),        // Create if absent.
+                                req.properties()   // File properties.
+                            ));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    res.response(streamId);
+                            res.response(streamId);
 
-                    break;
-                }
+                            break;
+                        }
 
-                default:
-                    assert false : "Unhandled path control request command: " + cmd;
+                        default:
+                            assert false : "Unhandled path control request command: " + cmd;
 
-                    break;
-            }
+                            break;
+                    }
+
+                    return null;
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 683b317..b8095b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -30,14 +30,14 @@ import java.util.*;
  */
 class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
     /** Delegate. */
-    private final IgfsImpl igfs;
+    private final IgfsEx igfs;
 
     /**
      * Constructor.
      *
      * @param igfs Delegate.
      */
-    IgfsSecondaryFileSystemImpl(IgfsImpl igfs) {
+    IgfsSecondaryFileSystemImpl(IgfsEx igfs) {
         this.igfs = igfs;
     }
 
@@ -118,4 +118,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
     @Override public Map<String, String> properties() {
         return Collections.emptyMap();
     }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteException {
+        igfs.stop(true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 253d5be..caa6866 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -239,13 +239,13 @@ public class IgfsServer {
      */
     private class ClientWorker extends GridWorker {
         /** Connected client endpoint. */
-        private IpcEndpoint endpoint;
+        private final IpcEndpoint endpoint;
 
         /** Data output stream. */
         private final IgfsDataOutputStream out;
 
         /** Client session object. */
-        private IgfsClientSession ses;
+        private final IgfsClientSession ses;
 
         /** Queue node for fast unlink. */
         private ConcurrentLinkedDeque8.Node<ClientWorker> node;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 4b0234f..8026a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
 
 import java.lang.reflect.*;
 
@@ -88,4 +90,18 @@ public class IgfsUtils {
     private IgfsUtils() {
         // No-op.
     }
+
+    /**
+     * Provides non-null user name.
+     * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
+     * which is the current process owner user.
+     * @param user a user name to be fixed.
+     * @return non-null interned user name.
+     */
+    public static String fixUserName(@Nullable String user) {
+        if (F.isEmpty(user))
+           user = FileSystemConfiguration.DFLT_USER_NAME;
+
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index ba891f8..6a630fb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -20,15 +20,16 @@ package org.apache.ignite.hadoop.fs;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.igfs.secondary.*;
 import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.igfs.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.jetbrains.annotations.*;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*;
 
 import java.io.*;
 import java.net.*;
@@ -37,15 +38,45 @@ import java.util.*;
 import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
 
 /**
- * Adapter to use any Hadoop file system {@link FileSystem} as  {@link IgfsSecondaryFileSystem}.
+ * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}.
+ * In fact, this class deals with different FileSystems depending on the user context,
+ * see {@link IgfsUserContext#currentUser()}.
  */
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable {
-    /** Hadoop file system. */
-    private final FileSystem fileSys;
-
-    /** Properties of file system */
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem {
+    /** Properties of file system, see {@link #properties()}
+     *
+     * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH}
+     * See {@link IgfsEx#SECONDARY_FS_URI}
+     * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
+     * */
     private final Map<String, String> props = new HashMap<>();
 
+    /** Secondary file system provider. */
+    private final SecondaryFileSystemProvider secProvider;
+
+    /** The default user name. It is used if no user context is set. */
+    private final String dfltUserName;
+
+    /** FileSystem instance created for the default user.
+     * Stored outside the fileSysLazyMap due to performance reasons. */
+    private final FileSystem dfltFs;
+
+    /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+    private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+        new ValueFactory<String, FileSystem>() {
+            @Override public FileSystem createValue(String key) {
+                try {
+                    assert !F.isEmpty(key);
+
+                    return secProvider.createFileSystem(key);
+                }
+                catch (IOException ioe) {
+                    throw new IgniteException(ioe);
+                }
+            }
+        }
+    );
+
     /**
      * Simple constructor that is to be used by default.
      *
@@ -77,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @throws IgniteCheckedException In case of error.
      */
     public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
-        @Nullable String userName)
-            throws IgniteCheckedException {
+        @Nullable String userName) throws IgniteCheckedException {
         // Treat empty uri and userName arguments as nulls to improve configuration usability:
         if (F.isEmpty(uri))
             uri = null;
@@ -89,27 +119,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         if (F.isEmpty(userName))
             userName = null;
 
+        this.dfltUserName = IgfsUtils.fixUserName(userName);
+
         try {
-            SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath, userName);
+            this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
 
-            fileSys = secProvider.createFileSystem();
+            // File system creation for the default user name.
+            // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
+            this.dfltFs = secProvider.createFileSystem(dfltUserName);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
 
-            uri = secProvider.uri().toString();
+        assert dfltFs != null;
 
-            if (!uri.endsWith("/"))
-                uri += "/";
+        uri = secProvider.uri().toString();
 
-            if (cfgPath != null)
-                props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+        if (!uri.endsWith("/"))
+            uri += "/";
 
-            if (userName != null)
-                props.put(SECONDARY_FS_USER_NAME, userName);
+        if (cfgPath != null)
+            props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
 
-            props.put(SECONDARY_FS_URI, uri);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
+        props.put(SECONDARY_FS_URI, uri);
+        props.put(SECONDARY_FS_USER_NAME, dfltUserName);
     }
 
     /**
@@ -119,7 +153,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @return Hadoop path.
      */
     private Path convert(IgfsPath path) {
-        URI uri = fileSys.getUri();
+        URI uri = fileSysForUser().getUri();
 
         return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
     }
@@ -131,14 +165,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
      * @param detailMsg Detailed error message.
      * @return Appropriate exception.
      */
-    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
     private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
-        boolean wrongVer = X.hasCause(e, RemoteException.class) ||
-            (e.getMessage() != null && e.getMessage().contains("Failed on local"));
-
-        return !wrongVer ? cast(detailMsg, e) :
-            new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
-                "version.", e);    }
+        return cast(detailMsg, e);
+    }
 
     /**
      * Cast IO exception to IGFS exception.
@@ -178,7 +207,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public boolean exists(IgfsPath path) {
         try {
-            return fileSys.exists(convert(path));
+            return fileSysForUser().exists(convert(path));
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
@@ -189,6 +218,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
         HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
 
+        final FileSystem fileSys = fileSysForUser();
+
         try {
             if (props0.userName() != null || props0.groupName() != null)
                 fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
@@ -208,7 +239,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Override public void rename(IgfsPath src, IgfsPath dest) {
         // Delegate to the secondary file system.
         try {
-            if (!fileSys.rename(convert(src), convert(dest)))
+            if (!fileSysForUser().rename(convert(src), convert(dest)))
                 throw new IgfsException("Failed to rename (secondary file system returned false) " +
                     "[src=" + src + ", dest=" + dest + ']');
         }
@@ -220,7 +251,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public boolean delete(IgfsPath path, boolean recursive) {
         try {
-            return fileSys.delete(convert(path), recursive);
+            return fileSysForUser().delete(convert(path), recursive);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
@@ -230,7 +261,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path) {
         try {
-            if (!fileSys.mkdirs(convert(path)))
+            if (!fileSysForUser().mkdirs(convert(path)))
                 throw new IgniteException("Failed to make directories [path=" + path + "]");
         }
         catch (IOException e) {
@@ -241,7 +272,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
         try {
-            if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
+            if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
                 throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
         }
         catch (IOException e) {
@@ -252,7 +283,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
         try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
+            FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
 
             if (statuses == null)
                 throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -275,7 +306,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
         try {
-            FileStatus[] statuses = fileSys.listStatus(convert(path));
+            FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
 
             if (statuses == null)
                 throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -302,13 +333,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
 
     /** {@inheritDoc} */
     @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
-        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize);
+        return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize);
     }
 
     /** {@inheritDoc} */
     @Override public OutputStream create(IgfsPath path, boolean overwrite) {
         try {
-            return fileSys.create(convert(path), overwrite);
+            return fileSysForUser().create(convert(path), overwrite);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
@@ -322,8 +353,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
             new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
 
         try {
-            return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
-                null);
+            return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize,
+                (short)replication, blockSize, null);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
@@ -336,7 +367,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
         @Nullable Map<String, String> props) {
         try {
-            return fileSys.append(convert(path), bufSize);
+            return fileSysForUser().append(convert(path), bufSize);
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -346,7 +377,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     /** {@inheritDoc} */
     @Override public IgfsFile info(final IgfsPath path) {
         try {
-            final FileStatus status = fileSys.getFileStatus(convert(path));
+            final FileStatus status = fileSysForUser().getFileStatus(convert(path));
 
             if (status == null)
                 return null;
@@ -421,7 +452,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
         try {
             // We don't use FileSystem#getUsed() since it counts only the files
             // in the filesystem root, not all the files recursively.
-            return fileSys.getContentSummary(new Path("/")).getSpaceConsumed();
+            return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed();
         }
         catch (IOException e) {
             throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
@@ -429,25 +460,57 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Map<String, String> properties() {
+    @Override public Map<String, String> properties() {
         return props;
     }
 
     /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
+    @Override public void close() throws IgniteException {
+        Exception e = null;
+
         try {
-            fileSys.close();
+            dfltFs.close();
         }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
+        catch (Exception e0) {
+            e = e0;
+        }
+
+        try {
+            fileSysLazyMap.close();
+        }
+        catch (IgniteCheckedException ice) {
+            if (e == null)
+                e = ice;
         }
+
+        if (e != null)
+            throw new IgniteException(e);
     }
 
     /**
      * Gets the underlying {@link FileSystem}.
+     * This method is used solely for testing.
      * @return the underlying Hadoop {@link FileSystem}.
      */
     public FileSystem fileSystem() {
-        return fileSys;
+        return fileSysForUser();
+    }
+
+    /**
+     * Gets the FileSystem for the current context user.
+     * @return the FileSystem instance, never null.
+     */
+    private FileSystem fileSysForUser() {
+        String user = IgfsUserContext.currentUser();
+
+        if (F.isEmpty(user))
+            user = dfltUserName; // default is never empty.
+
+        assert !F.isEmpty(user);
+
+        if (F.eq(user, dfltUserName))
+            return dfltFs; // optimization
+
+        return fileSysLazyMap.getOrCreate(user);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 1f53a06..c0a9ade 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
@@ -97,21 +98,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Grid remote client. */
     private HadoopIgfsWrapper rmtClient;
 
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>(){
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
+    /** working directory. */
+    private Path workingDir;
 
     /** Default replication factor. */
     private short dfltReplication;
@@ -129,6 +117,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
     /** Secondary URI string. */
     private URI secondaryUri;
 
+    /** The user name this file system was created on behalf of. */
+    private String user;
+
     /** IGFS mode resolver. */
     private IgfsModeResolver modeRslvr;
 
@@ -182,6 +173,36 @@ public class IgniteHadoopFileSystem extends FileSystem {
     }
 
     /**
+     * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+     * @return the user name, never null.
+     */
+    public static String getFsHadoopUser(Configuration cfg) throws IOException {
+        String user = null;
+
+        // -------------------------------------------
+        // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
+        // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
+        // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
+        // ugi.doAs() closure.
+        if (cfg != null)
+            user = cfg.get(MRJobConfig.USER_NAME);
+        // -------------------------------------------
+
+        if (user == null) {
+            UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+            if (currUgi != null)
+                user = currUgi.getShortUserName();
+        }
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        return user;
+    }
+
+    /**
      * Public setter that can be used by direct users of FS or Visor.
      *
      * @param colocateFileWrites Whether all ongoing file writes should be colocated.
@@ -221,7 +242,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             uriAuthority = uri.getAuthority();
 
-            setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+            user = getFsHadoopUser(cfg);
 
             // Override sequential reads before prefetch if needed.
             seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -244,7 +265,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
 
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -289,13 +310,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
                 String secUri = props.get(SECONDARY_FS_URI);
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-                String secUserName = props.get(SECONDARY_FS_USER_NAME);
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
-                        secUserName);
+                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+                    secondaryFs = secProvider.createFileSystem(user);
 
-                    secondaryFs = secProvider.createFileSystem();
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
@@ -306,6 +326,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
                             "will have no effect): " + e.getMessage());
                 }
             }
+
+            // set working directory to the home directory of the current Fs user:
+            setWorkingDirectory(null);
         }
         finally {
             leaveBusy();
@@ -849,22 +872,11 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
     /** {@inheritDoc} */
     @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
+        Path path = new Path("/user/" + user);
 
         return path.makeQualified(getUri(), null);
     }
 
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(null);
-    }
-
     /** {@inheritDoc} */
     @Override public void setWorkingDirectory(Path newPath) {
         if (newPath == null) {
@@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(homeDir));
 
-            workingDir.set(homeDir);
+            workingDir = homeDir;
         }
         else {
             Path fixedNewPath = fixRelativePart(newPath);
@@ -886,13 +898,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
             if (secondaryFs != null)
                 secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
 
-            workingDir.set(fixedNewPath);
+            workingDir = fixedNewPath;
         }
     }
 
     /** {@inheritDoc} */
     @Override public Path getWorkingDirectory() {
-        return workingDir.get();
+        return workingDir;
     }
 
     /** {@inheritDoc} */
@@ -1153,7 +1165,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
             return null;
 
         return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
-            new IgfsPath(convert(workingDir.get()), path.toUri().getPath());
+            new IgfsPath(convert(workingDir), path.toUri().getPath());
     }
 
     /**
@@ -1191,9 +1203,16 @@ public class IgniteHadoopFileSystem extends FileSystem {
      */
     @SuppressWarnings("deprecation")
     private FileStatus convert(IgfsFile file) {
-        return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(),
-            file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"),
+        return new FileStatus(
+            file.length(),
+            file.isDirectory(),
+            getDefaultReplication(),
+            file.groupBlockSize(),
+            file.modificationTime(),
+            file.accessTime(),
+            permission(file),
+            file.property(PROP_USER_NAME, user),
+            file.property(PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
                 return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() +
@@ -1247,4 +1266,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
     @Override public String toString() {
         return S.toString(IgniteHadoopFileSystem.class, this);
     }
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 9cfb79b..f3fbe9c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
@@ -40,6 +39,7 @@ import java.util.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*;
@@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     /** Grid remote client. */
     private HadoopIgfsWrapper rmtClient;
 
+    /** The name of the user this File System created on behalf of. */
+    private final String user;
+
     /** Working directory. */
     private IgfsPath workingDir;
 
     /** URI. */
-    private URI uri;
+    private final URI uri;
 
     /** Authority. */
     private String uriAuthority;
@@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
         uri = name;
 
+        user = getFsHadoopUser(cfg);
+
         try {
             initialize(name, cfg);
         }
@@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             throw e;
         }
 
-        workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+        workingDir = new IgfsPath("/user/" + user);
     }
 
     /** {@inheritDoc} */
@@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
 
-            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -284,13 +289,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
 
                 String secUri = props.get(SECONDARY_FS_URI);
                 String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
-                String secUserName = props.get(SECONDARY_FS_USER_NAME);
 
                 try {
-                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
-                        secUserName);
+                    SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+                    secondaryFs = secProvider.createAbstractFileSystem(user);
 
-                    secondaryFs = secProvider.createAbstractFileSystem();
                     secondaryUri = secProvider.uri();
                 }
                 catch (IOException e) {
@@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
             file.modificationTime(),
             file.accessTime(),
             permission(file),
-            file.property(PROP_USER_NAME, DFLT_USER_NAME),
+            file.property(PROP_USER_NAME, user),
             file.property(PROP_GROUP_NAME, "users"),
             convert(file.path())) {
             @Override public String toString() {
@@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
     @Override public String toString() {
         return S.toString(IgniteHadoopFileSystem.class, this);
     }
-}
+
+    /**
+     * Returns the user name this File System is created on behalf of.
+     * @return the user name
+     */
+    public String user() {
+        return user;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 00be422..d493bd4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -126,11 +126,15 @@ public class HadoopUtils {
                 break;
 
             case PHASE_REDUCE:
-                assert status.totalReducerCnt() > 0;
-
+                // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
+                // See https://issues.apache.org/jira/browse/IGNITE-764
                 setupProgress = 1;
                 mapProgress = 1;
-                reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+
+                if (status.totalReducerCnt() > 0)
+                    reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+                else
+                    reduceProgress = 1f;
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index 27805f8..b1a057c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.security.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.net.*;
+import java.security.*;
 
 /**
  * Encapsulates logic of secondary filesystem creation.
@@ -36,9 +39,6 @@ public class SecondaryFileSystemProvider {
     /** The secondary filesystem URI, never null. */
     private final URI uri;
 
-    /** Optional user name to log into secondary filesystem with. */
-    private @Nullable final String userName;
-
     /**
      * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
      * specified either explicitly or in the configuration provided.
@@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider {
      * property in the provided configuration.
      * @param secConfPath the secondary Fs path (file path on the local file system, optional).
      * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
-     * @param userName User name.
      * @throws IOException
      */
     public SecondaryFileSystemProvider(final @Nullable String secUri,
-        final @Nullable String secConfPath, @Nullable String userName) throws IOException {
-        this.userName = userName;
-
+        final @Nullable String secConfPath) throws IOException {
         if (secConfPath != null) {
             URL url = U.resolveIgniteUrl(secConfPath);
 
@@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider {
      * @return {@link org.apache.hadoop.fs.FileSystem}  instance for this secondary Fs.
      * @throws IOException
      */
-    public FileSystem createFileSystem() throws IOException {
+    public FileSystem createFileSystem(String userName) throws IOException {
+        userName = IgfsUtils.fixUserName(userName);
+
         final FileSystem fileSys;
 
-        if (userName == null)
-            fileSys = FileSystem.get(uri, cfg);
-        else {
-            try {
-                fileSys = FileSystem.get(uri, cfg, userName);
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+        try {
+           fileSys = FileSystem.get(uri, cfg, userName);
+        }
+        catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
 
-                throw new IOException("Failed to create file system due to interrupt.", e);
-            }
+           throw new IOException("Failed to create file system due to interrupt.", e);
         }
 
         return fileSys;
@@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider {
 
     /**
      * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
-     * @throws IOException
+     * @throws IOException in case of error.
      */
-    public AbstractFileSystem createAbstractFileSystem() throws IOException {
-        return AbstractFileSystem.get(uri, cfg);
+    public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
+        userName = IgfsUtils.fixUserName(userName);
+
+        String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+
+        UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
+
+        try {
+            return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
+                @Override public AbstractFileSystem run() throws IOException {
+                    return AbstractFileSystem.get(uri, cfg);
+                }
+            });
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+
+            throw new IOException("Failed to create file system due to interrupt.", ie);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
deleted file mode 100644
index 509f443..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class HadoopDistributedFileSystem extends DistributedFileSystem {
-    /** User name for each thread. */
-    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
-        /** {@inheritDoc} */
-        @Override protected String initialValue() {
-            return DFLT_USER_NAME;
-        }
-    };
-
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
-        /** {@inheritDoc} */
-        @Override protected Path initialValue() {
-            return getHomeDirectory();
-        }
-    };
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
-    }
-
-    /**
-     * Set user name and default working directory for current thread.
-     *
-     * @param userName User name.
-     */
-    public void setUser(String userName) {
-        this.userName.set(userName);
-
-        setWorkingDirectory(getHomeDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        Path path = new Path("/user/" + userName.get());
-
-        return path.makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        Path fixedDir = fixRelativePart(dir);
-
-        String res = fixedDir.toUri().getPath();
-
-        if (!DFSUtil.isValidName(res))
-            throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
-        workingDir.set(fixedDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workingDir.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index f3f51d4..d90bc28 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
 
 /**
  * Utilities for configuring file systems to support the separate working directory per each thread.
@@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils {
     public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
 
     /**
-     * Set user name and default working directory for current thread if it's supported by file system.
-     *
-     * @param fs File system.
-     * @param userName User name.
-     */
-    public static void setUser(FileSystem fs, String userName) {
-        if (fs instanceof IgniteHadoopFileSystem)
-            ((IgniteHadoopFileSystem)fs).setUser(userName);
-        else if (fs instanceof HadoopDistributedFileSystem)
-            ((HadoopDistributedFileSystem)fs).setUser(userName);
-    }
-
-    /**
      * Setup wrappers of filesystems to support the separate working directory.
      *
      * @param cfg Config for setup.
@@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils {
         cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
         cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
                 HadoopLocalFileSystemV2.class.getName());
-
-        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
index 2f19226..b9c5113 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs {
      * @throws IOException If failed.
      */
     public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+    /**
+     * The user this Igfs instance works on behalf of.
+     * @return the user name.
+     */
+    public String user();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 44e531e..47ba0e8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** Logger. */
     private final Log log;
 
+    /** The user this Igfs works on behalf of. */
+    private final String user;
+
     /**
      * Constructor.
      *
      * @param igfs Target IGFS.
      * @param log Log.
      */
-    public HadoopIgfsInProc(IgfsEx igfs, Log log) {
+    public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException {
+        this.user = IgfsUtils.fixUserName(userName);
+
         this.igfs = igfs;
+
         this.log = log;
 
         bufSize = igfs.configuration().getBlockSize() * 2;
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) {
-        igfs.clientLogDirectory(logDir);
+    @Override public IgfsHandshakeResponse handshake(final String logDir) {
+        return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() {
+            @Override public IgfsHandshakeResponse apply() {
+                igfs.clientLogDirectory(logDir);
 
-        return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
-            igfs.globalSampling());
+                return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
+                    igfs.globalSampling());
+                }
+         });
     }
 
     /** {@inheritDoc} */
@@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+    @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.info(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+                @Override public IgfsFile apply() {
+                    return igfs.info(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
         try {
-            return igfs.update(path, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+                @Override public IgfsFile apply() {
+                    return igfs.update(path, props);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException {
         try {
-            igfs.setTimes(path, accessTime, modificationTime);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.setTimes(path, accessTime, modificationTime);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException {
         try {
-            igfs.rename(src, dest);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.rename(src, dest);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
         try {
-            return igfs.delete(path, recursive);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+                @Override public Boolean apply() {
+                    return igfs.delete(path, recursive);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     /** {@inheritDoc} */
     @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
         try {
-            return igfs.globalSpace();
+            return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+                @Override public IgfsStatus call() throws IgniteCheckedException {
+                    return igfs.globalSpace();
+                }
+            });
         }
         catch (IllegalStateException e) {
             throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " +
                 "stopping.");
         }
+        catch (IgniteCheckedException | RuntimeException | Error e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new AssertionError("Must never go there.");
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.listPaths(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() {
+                @Override public Collection<IgfsPath> apply() {
+                    return igfs.listPaths(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.listFiles(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() {
+                @Override public Collection<IgfsFile> apply() {
+                    return igfs.listFiles(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
         try {
-            igfs.mkdirs(path, props);
+            IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+                @Override public Void apply() {
+                    igfs.mkdirs(path, props);
+
+                    return null;
+                }
+            });
 
             return true;
         }
@@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException {
         try {
-            return igfs.summary(path);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() {
+                @Override public IgfsPathSummary apply() {
+                    return igfs.summary(path);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len)
         throws IgniteCheckedException {
         try {
-            return igfs.affinity(path, start, len);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+                @Override public Collection<IgfsBlockLocation> apply() {
+                    return igfs.affinity(path, start, len);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
 
-            return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch)
+    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
         throws IgniteCheckedException {
         try {
-            IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
 
-            return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate,
+        final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
-                colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
+                        colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException {
+    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+        final @Nullable Map<String, String> props) throws IgniteCheckedException {
         try {
-            IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
+            return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+                @Override public HadoopIgfsStreamDelegate apply() {
+                    IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
 
-            return new HadoopIgfsStreamDelegate(this, stream);
+                    return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);
@@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
         if (lsnr0 != null && log.isDebugEnabled())
             log.debug("Removed stream event listener [delegate=" + delegate + ']');
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
index 0264e7b..3561e95 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.*;
 @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
 public class HadoopIgfsIpcIo implements HadoopIgfsIo {
     /** Logger. */
-    private Log log;
+    private final Log log;
 
     /** Request futures map. */
     private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =


[07/10] incubator-ignite git commit: #gg-10369: Fix exception when we start ignite and gridgain nodes.

Posted by se...@apache.org.
#gg-10369: Fix exception when we start ignite and gridgain nodes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5df06682
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5df06682
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5df06682

Branch: refs/heads/ignite-943
Commit: 5df06682c517731b3811ca4d0daabaa504b732f3
Parents: 8455c7a
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri May 29 16:57:30 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri May 29 16:57:30 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/GridJavaProcess.java   | 30 ++++++++++++--------
 1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5df06682/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
index bff26ec..42fe089 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
@@ -128,25 +128,31 @@ public final class GridJavaProcess {
         gjProc.log = log;
         gjProc.procKilledC = procKilledC;
 
-        String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
-        String classpath = System.getProperty("java.class.path");
-        String sfcp = System.getProperty("surefire.test.class.path");
-
-        if (sfcp != null)
-            classpath += System.getProperty("path.separator") + sfcp;
-
-        if (cp != null)
-            classpath += System.getProperty("path.separator") + cp;
-
         List<String> procParams = params == null || params.isEmpty() ?
             Collections.<String>emptyList() : Arrays.asList(params.split(" "));
 
         List<String> procCommands = new ArrayList<>();
 
+        String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+
         procCommands.add(javaBin);
         procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs);
-        procCommands.add("-cp");
-        procCommands.add(classpath);
+
+        if (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath")) {
+            String classpath = System.getProperty("java.class.path");
+
+            String sfcp = System.getProperty("surefire.test.class.path");
+
+            if (sfcp != null)
+                classpath += System.getProperty("path.separator") + sfcp;
+
+            if (cp != null)
+                classpath += System.getProperty("path.separator") + cp;
+
+            procCommands.add("-cp");
+            procCommands.add(classpath);
+        }
+
         procCommands.add(clsName);
         procCommands.addAll(procParams);
 


[02/10] incubator-ignite git commit: ignite-37 Improve offheap metrics for cache

Posted by se...@apache.org.
ignite-37 Improve offheap metrics for cache


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3f012b77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3f012b77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3f012b77

Branch: refs/heads/ignite-943
Commit: 3f012b77699405c06e5014ffe5ba4a7d4e7ab5f3
Parents: 036d68d
Author: agura <ag...@gridgain.com>
Authored: Thu Apr 30 20:53:53 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed May 27 15:02:39 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/CacheMetrics.java   | 187 +++++++--
 .../internal/managers/GridManagerAdapter.java   |  59 +--
 .../processors/cache/CacheMetricsImpl.java      | 305 +++++++++++++-
 .../cache/CacheMetricsMXBeanImpl.java           | 100 +++++
 .../processors/cache/CacheMetricsSnapshot.java  | 380 +++++++++++++----
 .../processors/cache/GridCacheSwapManager.java  | 118 ++++--
 .../ignite/mxbean/CacheMetricsMXBean.java       |  80 ++++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  35 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  47 ---
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |   8 +-
 ...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 412 +++++++++++++++++++
 .../testframework/GridSpiTestContext.java       |  25 +-
 .../IgniteCacheMetricsSelfTestSuite.java        |   1 +
 13 files changed, 1457 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 0d87326..799aace 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -30,21 +30,21 @@ public interface CacheMetrics {
     /**
      * The number of get requests that were satisfied by the cache.
      *
-     * @return the number of hits
+     * @return The number of hits.
      */
     public long getCacheHits();
 
     /**
      * This is a measure of cache efficiency.
      *
-     * @return the percentage of successful hits, as a decimal e.g 75.
+     * @return The percentage of successful hits, as a decimal e.g 75.
      */
     public float getCacheHitPercentage();
 
     /**
      * A miss is a get request that is not satisfied.
      *
-     * @return the number of misses
+     * @return The number of misses.
      */
     public long getCacheMisses();
 
@@ -52,7 +52,7 @@ public interface CacheMetrics {
      * Returns the percentage of cache accesses that did not find a requested entry
      * in the cache.
      *
-     * @return the percentage of accesses that failed to find anything
+     * @return The percentage of accesses that failed to find anything.
      */
     public float getCacheMissPercentage();
 
@@ -60,14 +60,14 @@ public interface CacheMetrics {
      * The total number of requests to the cache. This will be equal to the sum of
      * the hits and misses.
      *
-     * @return the number of gets
+     * @return The number of gets.
      */
     public long getCacheGets();
 
     /**
      * The total number of puts to the cache.
      *
-     * @return the number of puts
+     * @return The number of puts.
      */
     public long getCachePuts();
 
@@ -75,7 +75,7 @@ public interface CacheMetrics {
      * The total number of removals from the cache. This does not include evictions,
      * where the cache itself initiates the removal to make space.
      *
-     * @return the number of removals
+     * @return The number of removals.
      */
     public long getCacheRemovals();
 
@@ -84,28 +84,28 @@ public interface CacheMetrics {
      * initiated by the cache itself to free up space. An eviction is not treated as
      * a removal and does not appear in the removal counts.
      *
-     * @return the number of evictions
+     * @return The number of evictions.
      */
     public long getCacheEvictions();
 
     /**
      * The mean time to execute gets.
      *
-     * @return the time in µs
+     * @return The time in µs.
      */
     public float getAverageGetTime();
 
     /**
      * The mean time to execute puts.
      *
-     * @return the time in µs
+     * @return The time in µs.
      */
     public float getAveragePutTime();
 
     /**
      * The mean time to execute removes.
      *
-     * @return the time in µs
+     * @return The time in µs.
      */
     public float getAverageRemoveTime();
 
@@ -113,7 +113,7 @@ public interface CacheMetrics {
     /**
      * The mean time to execute tx commit.
      *
-     * @return the time in µs
+     * @return The time in µs.
      */
     public float getAverageTxCommitTime();
 
@@ -124,7 +124,6 @@ public interface CacheMetrics {
      */
     public float getAverageTxRollbackTime();
 
-
     /**
      * Gets total number of transaction commits.
      *
@@ -154,6 +153,62 @@ public interface CacheMetrics {
     public long getOverflowSize();
 
     /**
+     * The total number of get requests to the off-heap memory.
+     *
+     * @return The number of gets.
+     */
+    public long getOffHeapGets();
+
+    /**
+     * The total number of put requests to the off-heap memory.
+     *
+     * @return The number of puts.
+     */
+    public long getOffHeapPuts();
+
+    /**
+     * The total number of removals from the off-heap memory. This does not include evictions.
+     *
+     * @return The number of removals.
+     */
+    public long getOffHeapRemovals();
+
+    /**
+     * The total number of evictions from the off-heap memory.
+     *
+     * @return The number of evictions.
+     */
+    public long getOffHeapEvictions();
+
+    /**
+     * The number of get requests that were satisfied by the off-heap memory.
+     *
+     * @return The off-heap hits number.
+     */
+    public long getOffHeapHits();
+
+    /**
+     * Gets the percentage of hits on off-heap memory.
+     *
+     * @return The percentage of hits on off-heap memory.
+     */
+    public float getOffHeapHitPercentage();
+
+    /**
+     * A miss is a get request that is not satisfied by off-heap memory.
+     *
+     * @return The off-heap misses number.
+     */
+    public long getOffHeapMisses();
+
+    /**
+     * Gets the percentage of misses on off-heap memory.
+     *
+     * @return The percentage of misses on off-heap memory.
+     */
+    public float getOffHeapMissPercentage();
+
+    /**
      * Gets number of entries stored in off-heap memory.
      *
      * @return Number of entries stored in off-heap memory.
@@ -161,6 +216,20 @@ public interface CacheMetrics {
     public long getOffHeapEntriesCount();
 
     /**
+     * Gets number of primary entries stored in off-heap memory.
+     *
+     * @return Number of primary entries stored in off-heap memory.
+     */
+    public long getOffHeapPrimaryEntriesCount();
+
+    /**
+     * Gets number of backup entries stored in off-heap memory.
+     *
+     * @return Number of backup entries stored in off-heap memory.
+     */
+    public long getOffHeapBackupEntriesCount();
+
+    /**
      * Gets memory size allocated in off-heap.
      *
      * @return Memory size allocated in off-heap.
@@ -168,6 +237,76 @@ public interface CacheMetrics {
     public long getOffHeapAllocatedSize();
 
     /**
+     * Gets off-heap memory maximum size.
+     *
+     * @return Off-heap memory maximum size.
+     */
+    public long getOffHeapMaxSize();
+
+    /**
+     * The total number of get requests to the swap.
+     *
+     * @return The number of gets.
+     */
+    public long getSwapGets();
+
+    /**
+     * The total number of put requests to the swap.
+     *
+     * @return The number of puts.
+     */
+    public long getSwapPuts();
+
+    /**
+     * The total number of removals from the swap.
+     *
+     * @return The number of removals.
+     */
+    public long getSwapRemovals();
+
+    /**
+     * The number of get requests that were satisfied by the swap.
+     *
+     * @return The swap hits number.
+     */
+    public long getSwapHits();
+
+    /**
+     * A miss is a get request that is not satisfied by swap.
+     *
+     * @return The swap misses number.
+     */
+    public long getSwapMisses();
+
+    /**
+     * Gets number of entries stored in swap.
+     *
+     * @return Number of entries stored in swap.
+     */
+    public long getSwapEntriesCount();
+
+    /**
+     * Gets size of swap.
+     *
+     * @return Size of swap.
+     */
+    public long getSwapSize();
+
+    /**
+     * Gets the percentage of hits on swap.
+     *
+     * @return The percentage of hits on swap.
+     */
+    public float getSwapHitPercentage();
+
+    /**
+     * Gets the percentage of misses on swap.
+     *
+     * @return The percentage of misses on swap.
+     */
+    public float getSwapMissPercentage();
+
+    /**
      * Gets number of non-{@code null} values in the cache.
      *
      * @return Number of non-{@code null} values in the cache.
@@ -184,7 +323,7 @@ public interface CacheMetrics {
     /**
      * Returns {@code true} if this cache is empty.
      *
-     * @return {@code true} if this cache is empty.
+     * @return {@code True} if this cache is empty.
      */
     public boolean isEmpty();
 
@@ -294,7 +433,7 @@ public interface CacheMetrics {
     public int getTxDhtRolledbackVersionsSize();
 
     /**
-     * Returns {@code True} if write-behind is enabled.
+     * Returns {@code true} if write-behind is enabled.
      *
      * @return {@code True} if write-behind is enabled.
      */
@@ -372,16 +511,16 @@ public interface CacheMetrics {
     /**
      * Determines the required type of keys for this {@link Cache}, if any.
      *
-     * @return the fully qualified class name of the key type,
-     * or "java.lang.Object" if the type is undefined.
+     * @return The fully qualified class name of the key type,
+     * or {@code "java.lang.Object"} if the type is undefined.
      */
     public String getKeyType();
 
     /**
      * Determines the required type of values for this {@link Cache}, if any.
      *
-     * @return the fully qualified class name of the value type,
-     *         or "java.lang.Object" if the type is undefined.
+     * @return The fully qualified class name of the value type,
+     *         or {@code "java.lang.Object"} if the type is undefined.
      */
     public String getValueType();
 
@@ -407,7 +546,7 @@ public interface CacheMetrics {
      * <p>
      * The default value is {@code true}.
      *
-     * @return true if the cache is store by value
+     * @return {@code True} if the cache is store by value.
      */
     public boolean isStoreByValue();
 
@@ -416,7 +555,7 @@ public interface CacheMetrics {
      * <p>
      * The default value is {@code false}.
      *
-     * @return true if statistics collection is enabled
+     * @return {@code True} if statistics collection is enabled.
      */
     public boolean isStatisticsEnabled();
 
@@ -425,7 +564,7 @@ public interface CacheMetrics {
      * <p>
      * The default value is {@code false}.
      *
-     * @return true if management is enabled
+     * @return {@code true} if management is enabled.
      */
     public boolean isManagementEnabled();
 
@@ -434,7 +573,7 @@ public interface CacheMetrics {
      * <p>
      * The default value is {@code false}
      *
-     * @return {@code true} when a {@link Cache} is in
+     * @return {@code True} when a {@link Cache} is in
      *         "read-through" mode.
      * @see CacheLoader
      */
@@ -448,7 +587,7 @@ public interface CacheMetrics {
      * <p>
      * The default value is {@code false}
      *
-     * @return {@code true} when a {@link Cache} is in "write-through" mode.
+     * @return {@code True} when a {@link Cache} is in "write-through" mode.
      * @see CacheWriter
      */
     public boolean isWriteThrough();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index c93c059..1eb7143 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,7 +23,6 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -31,7 +30,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.swapspace.*;
+
 import org.jetbrains.annotations.*;
 
 import javax.cache.expiry.*;
@@ -439,46 +438,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.cache().cache(cacheName).containsKey(key);
                     }
 
-                    @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
-                        @Nullable ClassLoader ldr) {
-                        assert ctx.swap().enabled();
-
-                        try {
-                            ctx.swap().write(spaceName, key, val, ldr);
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw U.convertException(e);
-                        }
-                    }
-
-                    @SuppressWarnings({"unchecked"})
-                    @Nullable @Override public <T> T readFromSwap(String spaceName, SwapKey key,
-                        @Nullable ClassLoader ldr) {
-                        try {
-                            assert ctx.swap().enabled();
-
-                            return ctx.swap().readValue(spaceName, key, ldr);
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw U.convertException(e);
-                        }
-                    }
-
                     @Override public int partition(String cacheName, Object key) {
                         return ctx.cache().cache(cacheName).affinity().partition(key);
                     }
 
-                    @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) {
-                        try {
-                            assert ctx.swap().enabled();
-
-                            ctx.swap().remove(spaceName, key, null, ldr);
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw U.convertException(e);
-                        }
-                    }
-
                     @Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
                         for (GridComponent comp : ctx) {
                             IgniteNodeValidationResult err = comp.validateNode(node);
@@ -508,26 +471,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         }
                     }
 
-                    @SuppressWarnings("unchecked")
-                    @Nullable @Override public <V> V readValueFromOffheapAndSwap(@Nullable String spaceName,
-                        Object key, @Nullable ClassLoader ldr) {
-                        try {
-                            IgniteInternalCache<Object, V> cache = ctx.cache().cache(spaceName);
-
-                            GridCacheContext cctx = cache.context();
-
-                            if (cctx.isNear())
-                                cctx = cctx.near().dht().context();
-
-                            GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
-
-                            return e != null ? CU.<V>value(e.value(), cctx, true) : null;
-                        }
-                        catch (IgniteCheckedException e) {
-                            throw U.convertException(e);
-                        }
-                    }
-
                     @Override public MessageFormatter messageFormatter() {
                         return ctx.io().formatter();
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 560de97..3dcda3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.util.concurrent.atomic.*;
 
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+
 /**
  * Adapter for cache metrics.
  */
@@ -63,7 +65,7 @@ public class CacheMetricsImpl implements CacheMetrics {
     private AtomicLong getTimeNanos = new AtomicLong();
 
     /** Remove time taken nanos. */
-    private AtomicLong removeTimeNanos = new AtomicLong();
+    private AtomicLong rmvTimeNanos = new AtomicLong();
 
     /** Commit transaction time taken nanos. */
     private AtomicLong commitTimeNanos = new AtomicLong();
@@ -71,6 +73,39 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** Commit transaction time taken nanos. */
     private AtomicLong rollbackTimeNanos = new AtomicLong();
 
+    /** Number of reads from off-heap memory. */
+    private AtomicLong offHeapGets = new AtomicLong();
+
+    /** Number of writes to off-heap memory. */
+    private AtomicLong offHeapPuts = new AtomicLong();
+
+    /** Number of removed entries from off-heap memory. */
+    private AtomicLong offHeapRemoves = new AtomicLong();
+
+    /** Number of evictions from off-heap memory. */
+    private AtomicLong offHeapEvicts = new AtomicLong();
+
+    /** Number of off-heap hits. */
+    private AtomicLong offHeapHits = new AtomicLong();
+
+    /** Number of off-heap misses. */
+    private AtomicLong offHeapMisses = new AtomicLong();
+
+    /** Number of reads from swap. */
+    private AtomicLong swapGets = new AtomicLong();
+
+    /** Number of writes to swap. */
+    private AtomicLong swapPuts = new AtomicLong();
+
+    /** Number of removed entries from swap. */
+    private AtomicLong swapRemoves = new AtomicLong();
+
+    /** Number of swap hits. */
+    private AtomicLong swapHits = new AtomicLong();
+
+    /** Number of swap misses. */
+    private AtomicLong swapMisses = new AtomicLong();
+
     /** Cache metrics. */
     @GridToStringExclude
     private transient CacheMetricsImpl delegate;
@@ -126,16 +161,160 @@ public class CacheMetricsImpl implements CacheMetrics {
     }
 
     /** {@inheritDoc} */
+    @Override public long getOffHeapGets() {
+        return offHeapGets.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapPuts() {
+        return offHeapPuts.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapRemovals() {
+        return offHeapRemoves.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapEvictions() {
+        return offHeapEvicts.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapHits() {
+        return offHeapHits.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getOffHeapHitPercentage() {
+        long hits0 = offHeapHits.get();
+        long gets0 = offHeapGets.get();
+
+        if (hits0 == 0)
+            return 0;
+
+        return (float) hits0 / gets0 * 100.0f;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapMisses() {
+        return offHeapMisses.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getOffHeapMissPercentage() {
+        long misses0 = offHeapMisses.get();
+        long reads0 = offHeapGets.get();
+
+        if (misses0 == 0)
+            return 0;
+
+        return (float) misses0 / reads0 * 100.0f;
+    }
+
+    /** {@inheritDoc} */
     @Override public long getOffHeapEntriesCount() {
         return cctx.cache().offHeapEntriesCount();
     }
 
     /** {@inheritDoc} */
+    @Override public long getOffHeapPrimaryEntriesCount() {
+        try {
+            return cctx.swap().offheapEntriesCount(true, false, NONE);
+        }
+        catch (IgniteCheckedException e) {
+            return 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapBackupEntriesCount() {
+        try {
+            return cctx.swap().offheapEntriesCount(false, true, NONE);
+        }
+        catch (IgniteCheckedException e) {
+            return 0;
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public long getOffHeapAllocatedSize() {
         return cctx.cache().offHeapAllocatedSize();
     }
 
     /** {@inheritDoc} */
+    @Override public long getOffHeapMaxSize() {
+        return cctx.config().getOffHeapMaxMemory();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapGets() {
+        return swapGets.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapPuts() {
+        return swapPuts.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapRemovals() {
+        return swapRemoves.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapHits() {
+        return swapHits.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapMisses() {
+        return swapMisses.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapEntriesCount() {
+        try {
+            return cctx.cache().swapKeys();
+        }
+        catch (IgniteCheckedException e) {
+            return 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapSize() {
+        try {
+            return cctx.cache().swapSize();
+        }
+        catch (IgniteCheckedException e) {
+            return 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getSwapHitPercentage() {
+        long hits0 = swapHits.get();
+        long gets0 = swapGets.get();
+
+        if (hits0 == 0)
+            return 0;
+
+        return (float) hits0 / gets0 * 100.0f;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getSwapMissPercentage() {
+        long misses0 = swapMisses.get();
+        long reads0 = swapGets.get();
+
+        if (misses0 == 0)
+            return 0;
+
+        return (float) misses0 / reads0 * 100.0f;
+    }
+
+    /** {@inheritDoc} */
     @Override public int getSize() {
         return cctx.cache().size();
     }
@@ -317,11 +496,24 @@ public class CacheMetricsImpl implements CacheMetrics {
         txCommits.set(0);
         txRollbacks.set(0);
         putTimeNanos.set(0);
-        removeTimeNanos.set(0);
+        rmvTimeNanos.set(0);
         getTimeNanos.set(0);
         commitTimeNanos.set(0);
         rollbackTimeNanos.set(0);
 
+        offHeapGets.set(0);
+        offHeapPuts.set(0);
+        offHeapRemoves.set(0);
+        offHeapHits.set(0);
+        offHeapMisses.set(0);
+        offHeapEvicts.set(0);
+
+        swapGets.set(0);
+        swapPuts.set(0);
+        swapRemoves.set(0);
+        swapHits.set(0);
+        swapMisses.set(0);
+
         if (delegate != null)
             delegate.clear();
     }
@@ -402,7 +594,7 @@ public class CacheMetricsImpl implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public float getAverageRemoveTime() {
-        long timeNanos = removeTimeNanos.get();
+        long timeNanos = rmvTimeNanos.get();
         long removesCnt = rmCnt.get();
 
         if (timeNanos == 0 || removesCnt == 0)
@@ -483,7 +675,6 @@ public class CacheMetricsImpl implements CacheMetrics {
             delegate.onTxRollback(duration);
     }
 
-
     /**
      * Increments the get time accumulator.
      *
@@ -514,7 +705,7 @@ public class CacheMetricsImpl implements CacheMetrics {
      * @param duration the time taken in nanoseconds.
      */
     public void addRemoveTimeNanos(long duration) {
-        removeTimeNanos.addAndGet(duration);
+        rmvTimeNanos.addAndGet(duration);
 
         if (delegate != null)
             delegate.addRemoveTimeNanos(duration);
@@ -526,7 +717,7 @@ public class CacheMetricsImpl implements CacheMetrics {
      * @param duration the time taken in nanoseconds.
      */
     public void addRemoveAndGetTimeNanos(long duration) {
-        removeTimeNanos.addAndGet(duration);
+        rmvTimeNanos.addAndGet(duration);
         getTimeNanos.addAndGet(duration);
 
         if (delegate != null)
@@ -581,6 +772,108 @@ public class CacheMetricsImpl implements CacheMetrics {
         return cctx.config().isManagementEnabled();
     }
 
+    /**
+     * Off-heap read callback.
+     *
+     * @param hit Hit or miss flag.
+     */
+    public void onOffHeapRead(boolean hit) {
+        offHeapGets.incrementAndGet();
+
+        if (hit)
+            offHeapHits.incrementAndGet();
+        else
+            offHeapMisses.incrementAndGet();
+
+        if (delegate != null)
+            delegate.onOffHeapRead(hit);
+    }
+
+    /**
+     * Off-heap write callback.
+     */
+    public void onOffHeapWrite() {
+        offHeapPuts.incrementAndGet();
+
+        if (delegate != null)
+            delegate.onOffHeapWrite();
+    }
+
+    /**
+     * Off-heap remove callback.
+     */
+    public void onOffHeapRemove() {
+        offHeapRemoves.incrementAndGet();
+
+        if (delegate != null)
+            delegate.onOffHeapRemove();
+    }
+
+    /**
+     * Off-heap evict callback.
+     */
+    public void onOffHeapEvict() {
+        offHeapEvicts.incrementAndGet();
+
+        if (delegate != null)
+            delegate.onOffHeapRemove();
+    }
+
+    /**
+     * Swap read callback.
+     *
+     * @param hit Hit or miss flag.
+     */
+    public void onSwapRead(boolean hit) {
+        swapGets.incrementAndGet();
+
+        if (hit)
+            swapHits.incrementAndGet();
+        else
+            swapMisses.incrementAndGet();
+
+        if (delegate != null)
+            delegate.onSwapRead(hit);
+    }
+
+    /**
+     * Swap write callback.
+     */
+    public void onSwapWrite() {
+        onSwapWrite(1);
+    }
+
+    /**
+     * Swap write callback.
+     *
+     * @param cnt Amount of entries.
+     */
+    public void onSwapWrite(int cnt) {
+        swapPuts.addAndGet(cnt);
+
+        if (delegate != null)
+            delegate.onSwapWrite(cnt);
+    }
+
+    /**
+     * Swap remove callback.
+     */
+    public void onSwapRemove() {
+        onSwapRemove(1);
+    }
+
+    /**
+     * Swap remove callback.
+     *
+     * @param cnt Amount of entries.
+     */
+    public void onSwapRemove(int cnt) {
+        swapRemoves.addAndGet(cnt);
+
+        if (delegate != null)
+            delegate.onSwapRemove(cnt);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheMetricsImpl.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
index e9d547c..966027a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
@@ -49,16 +49,116 @@ class CacheMetricsMXBeanImpl implements CacheMetricsMXBean {
     }
 
     /** {@inheritDoc} */
+    @Override public long getOffHeapGets() {
+        return cache.metrics0().getOffHeapGets();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapPuts() {
+        return cache.metrics0().getOffHeapPuts();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapRemovals() {
+        return cache.metrics0().getOffHeapRemovals();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapEvictions() {
+        return cache.metrics0().getOffHeapEvictions();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapHits() {
+        return cache.metrics0().getOffHeapHits();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getOffHeapHitPercentage() {
+        return cache.metrics0().getOffHeapHitPercentage();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapMisses() {
+        return cache.metrics0().getOffHeapMisses();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getOffHeapMissPercentage() {
+        return cache.metrics0().getOffHeapMissPercentage();
+    }
+
+    /** {@inheritDoc} */
     @Override public long getOffHeapEntriesCount() {
         return cache.metrics0().getOffHeapEntriesCount();
     }
 
     /** {@inheritDoc} */
+    @Override public long getOffHeapPrimaryEntriesCount() {
+        return cache.metrics0().getOffHeapPrimaryEntriesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapBackupEntriesCount() {
+        return cache.metrics0().getOffHeapBackupEntriesCount();
+    }
+
+    /** {@inheritDoc} */
     @Override public long getOffHeapAllocatedSize() {
         return cache.metrics0().getOffHeapAllocatedSize();
     }
 
     /** {@inheritDoc} */
+    @Override public long getOffHeapMaxSize() {
+        return cache.metrics0().getOffHeapMaxSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapGets() {
+        return cache.metrics0().getSwapGets();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapPuts() {
+        return cache.metrics0().getSwapPuts();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapRemovals() {
+        return cache.metrics0().getSwapRemovals();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapHits() {
+        return cache.metrics0().getSwapHits();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapMisses() {
+        return cache.metrics0().getSwapMisses();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getSwapHitPercentage() {
+        return cache.metrics0().getSwapHitPercentage();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getSwapMissPercentage() {
+        return cache.metrics0().getSwapMissPercentage();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapEntriesCount() {
+        return cache.metrics0().getSwapEntriesCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapSize() {
+        return cache.metrics0().getSwapSize();
+    }
+
+    /** {@inheritDoc} */
     @Override public int getSize() {
         return cache.metrics0().getSize();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index 4fe152a..cf16d9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -61,7 +61,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     private float getAvgTimeNanos = 0;
 
     /** Remove time taken nanos. */
-    private float removeAvgTimeNanos = 0;
+    private float rmvAvgTimeNanos = 0;
 
     /** Commit transaction time taken nanos. */
     private float commitAvgTimeNanos = 0;
@@ -75,12 +75,60 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     /** Number of entries that was swapped to disk. */
     private long overflowSize;
 
+    /** Number of reads from off-heap. */
+    private long offHeapGets;
+
+    /** Number of writes to off-heap. */
+    private long offHeapPuts;
+
+    /** Number of removed entries from off-heap. */
+    private long offHeapRemoves;
+
+    /** Number of evictions from off-heap. */
+    private long offHeapEvicts;
+
+    /** Off-heap hits number. */
+    private long offHeapHits;
+
+    /** Off-heap misses number. */
+    private long offHeapMisses;
+
     /** Number of entries stored in off-heap memory. */
-    private long offHeapEntriesCount;
+    private long offHeapEntriesCnt;
+
+    /** Number of primary entries stored in off-heap memory. */
+    private long offHeapPrimaryEntriesCnt;
+
+    /** Number of backup entries stored in off-heap memory. */
+    private long offHeapBackupEntriesCnt;
 
     /** Memory size allocated in off-heap. */
     private long offHeapAllocatedSize;
 
+    /** Off-heap memory maximum size*/
+    private long offHeapMaxSize;
+
+    /** Number of reads from swap. */
+    private long swapGets;
+
+    /** Number of writes to swap. */
+    private long swapPuts;
+
+    /** Number of removed entries from swap. */
+    private long swapRemoves;
+
+    /** Number of entries stored in swap. */
+    private long swapEntriesCnt;
+
+    /** Swap hits number. */
+    private long swapHits;
+
+    /** Swap misses number. */
+    private long swapMisses;
+
+    /** Swap size. */
+    private long swapSize;
+
     /** Number of non-{@code null} values in the cache. */
     private int size;
 
@@ -91,7 +139,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     private boolean isEmpty;
 
     /** Gets current size of evict queue used to batch up evictions. */
-    private int dhtEvictQueueCurrentSize;
+    private int dhtEvictQueueCurrSize;
 
     /** Transaction per-thread map size. */
     private int txThreadMapSize;
@@ -106,7 +154,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     private int txPrepareQueueSize;
 
     /** Start version counts map size. */
-    private int txStartVersionCountsSize;
+    private int txStartVerCountsSize;
 
     /** Number of cached committed transaction IDs. */
     private int txCommittedVersionsSize;
@@ -127,7 +175,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     private int txDhtPrepareQueueSize;
 
     /** DHT start version counts map size. */
-    private int txDhtStartVersionCountsSize;
+    private int txDhtStartVerCountsSize;
 
     /** Number of cached committed DHT transaction IDs. */
     private int txDhtCommittedVersionsSize;
@@ -142,34 +190,34 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     private int writeBehindFlushSize;
 
     /** Count of worker threads. */
-    private int writeBehindFlushThreadCount;
+    private int writeBehindFlushThreadCnt;
 
     /** Flush frequency in milliseconds. */
-    private long writeBehindFlushFrequency;
+    private long writeBehindFlushFreq;
 
     /** Maximum size of batch. */
     private int writeBehindStoreBatchSize;
 
     /** Count of cache overflow events since start. */
-    private int writeBehindTotalCriticalOverflowCount;
+    private int writeBehindTotalCriticalOverflowCnt;
 
     /** Count of cache overflow events since start. */
-    private int writeBehindCriticalOverflowCount;
+    private int writeBehindCriticalOverflowCnt;
 
     /** Count of entries in store-retry state. */
-    private int writeBehindErrorRetryCount;
+    private int writeBehindErrorRetryCnt;
 
     /** Total count of entries in cache store internal buffer. */
-    private int writeBehindBufferSize;
+    private int writeBehindBufSize;
 
     /** */
     private String keyType;
 
     /** */
-    private String valueType;
+    private String valType;
 
     /** */
-    private boolean isStoreByValue;
+    private boolean isStoreByVal;
 
     /** */
     private boolean isStatisticsEnabled;
@@ -207,45 +255,64 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
         putAvgTimeNanos = m.getAveragePutTime();
         getAvgTimeNanos = m.getAverageGetTime();
-        removeAvgTimeNanos = m.getAverageRemoveTime();
+        rmvAvgTimeNanos = m.getAverageRemoveTime();
         commitAvgTimeNanos = m.getAverageTxCommitTime();
         rollbackAvgTimeNanos = m.getAverageTxRollbackTime();
 
         cacheName = m.name();
         overflowSize = m.getOverflowSize();
-        offHeapEntriesCount = m.getOffHeapEntriesCount();
+
+        offHeapGets = m.getOffHeapGets();
+        offHeapPuts = m.getOffHeapPuts();
+        offHeapRemoves = m.getOffHeapRemovals();
+        offHeapEvicts = m.getOffHeapEvictions();
+        offHeapHits = m.getOffHeapHits();
+        offHeapMisses = m.getOffHeapMisses();
+        offHeapEntriesCnt = m.getOffHeapEntriesCount();
+        offHeapPrimaryEntriesCnt = m.getOffHeapPrimaryEntriesCount();
+        offHeapBackupEntriesCnt = m.getOffHeapBackupEntriesCount();
         offHeapAllocatedSize = m.getOffHeapAllocatedSize();
+        offHeapMaxSize = m.getOffHeapMaxSize();
+
+        swapGets = m.getSwapGets();
+        swapPuts = m.getSwapPuts();
+        swapRemoves = m.getSwapRemovals();
+        swapHits = m.getSwapHits();
+        swapMisses = m.getSwapMisses();
+        swapEntriesCnt = m.getSwapEntriesCount();
+        swapSize = m.getSwapSize();
+
         size = m.getSize();
         keySize = m.getKeySize();
         isEmpty = m.isEmpty();
-        dhtEvictQueueCurrentSize = m.getDhtEvictQueueCurrentSize();
+        dhtEvictQueueCurrSize = m.getDhtEvictQueueCurrentSize();
         txThreadMapSize = m.getTxThreadMapSize();
         txXidMapSize = m.getTxXidMapSize();
         txCommitQueueSize = m.getTxCommitQueueSize();
         txPrepareQueueSize = m.getTxPrepareQueueSize();
-        txStartVersionCountsSize = m.getTxStartVersionCountsSize();
+        txStartVerCountsSize = m.getTxStartVersionCountsSize();
         txCommittedVersionsSize = m.getTxCommittedVersionsSize();
         txRolledbackVersionsSize = m.getTxRolledbackVersionsSize();
         txDhtThreadMapSize = m.getTxDhtThreadMapSize();
         txDhtXidMapSize = m.getTxDhtXidMapSize();
         txDhtCommitQueueSize = m.getTxDhtCommitQueueSize();
         txDhtPrepareQueueSize = m.getTxDhtPrepareQueueSize();
-        txDhtStartVersionCountsSize = m.getTxDhtStartVersionCountsSize();
+        txDhtStartVerCountsSize = m.getTxDhtStartVersionCountsSize();
         txDhtCommittedVersionsSize = m.getTxDhtCommittedVersionsSize();
         txDhtRolledbackVersionsSize = m.getTxDhtRolledbackVersionsSize();
         isWriteBehindEnabled = m.isWriteBehindEnabled();
         writeBehindFlushSize = m.getWriteBehindFlushSize();
-        writeBehindFlushThreadCount = m.getWriteBehindFlushThreadCount();
-        writeBehindFlushFrequency = m.getWriteBehindFlushFrequency();
+        writeBehindFlushThreadCnt = m.getWriteBehindFlushThreadCount();
+        writeBehindFlushFreq = m.getWriteBehindFlushFrequency();
         writeBehindStoreBatchSize = m.getWriteBehindStoreBatchSize();
-        writeBehindTotalCriticalOverflowCount = m.getWriteBehindTotalCriticalOverflowCount();
-        writeBehindCriticalOverflowCount = m.getWriteBehindCriticalOverflowCount();
-        writeBehindErrorRetryCount = m.getWriteBehindErrorRetryCount();
-        writeBehindBufferSize = m.getWriteBehindBufferSize();
+        writeBehindTotalCriticalOverflowCnt = m.getWriteBehindTotalCriticalOverflowCount();
+        writeBehindCriticalOverflowCnt = m.getWriteBehindCriticalOverflowCount();
+        writeBehindErrorRetryCnt = m.getWriteBehindErrorRetryCount();
+        writeBehindBufSize = m.getWriteBehindBufferSize();
 
         keyType = m.getKeyType();
-        valueType = m.getValueType();
-        isStoreByValue = m.isStoreByValue();
+        valType = m.getValueType();
+        isStoreByVal = m.isStoreByValue();
         isStatisticsEnabled = m.isStatisticsEnabled();
         isManagementEnabled = m.isManagementEnabled();
         isReadThrough = m.isReadThrough();
@@ -263,21 +330,23 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
         isEmpty = loc.isEmpty();
         isWriteBehindEnabled = loc.isWriteBehindEnabled();
         writeBehindFlushSize = loc.getWriteBehindFlushSize();
-        writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount();
-        writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency();
+        writeBehindFlushThreadCnt = loc.getWriteBehindFlushThreadCount();
+        writeBehindFlushFreq = loc.getWriteBehindFlushFrequency();
         writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize();
-        writeBehindBufferSize = loc.getWriteBehindBufferSize();
+        writeBehindBufSize = loc.getWriteBehindBufferSize();
         size = loc.getSize();
         keySize = loc.getKeySize();
 
         keyType = loc.getKeyType();
-        valueType = loc.getValueType();
-        isStoreByValue = loc.isStoreByValue();
+        valType = loc.getValueType();
+        isStoreByVal = loc.isStoreByValue();
         isStatisticsEnabled = loc.isStatisticsEnabled();
         isManagementEnabled = loc.isManagementEnabled();
         isReadThrough = loc.isReadThrough();
         isWriteThrough = loc.isWriteThrough();
 
+        offHeapMaxSize = loc.getOffHeapMaxSize();
+
         for (CacheMetrics e : metrics) {
             reads += e.getCacheGets();
             puts += e.getCachePuts();
@@ -290,7 +359,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
             putAvgTimeNanos += e.getAveragePutTime();
             getAvgTimeNanos += e.getAverageGetTime();
-            removeAvgTimeNanos += e.getAverageRemoveTime();
+            rmvAvgTimeNanos += e.getAverageRemoveTime();
             commitAvgTimeNanos += e.getAverageTxCommitTime();
             rollbackAvgTimeNanos += e.getAverageTxRollbackTime();
 
@@ -299,19 +368,35 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
             else
                 overflowSize = -1;
 
-            offHeapEntriesCount += e.getOffHeapEntriesCount();
+            offHeapGets += e.getOffHeapGets();
+            offHeapPuts += e.getOffHeapPuts();
+            offHeapRemoves += e.getOffHeapRemovals();
+            offHeapEvicts += e.getOffHeapEvictions();
+            offHeapHits += e.getOffHeapHits();
+            offHeapMisses += e.getOffHeapMisses();
+            offHeapEntriesCnt += e.getOffHeapEntriesCount();
+            offHeapPrimaryEntriesCnt += e.getOffHeapPrimaryEntriesCount();
+            offHeapBackupEntriesCnt += e.getOffHeapBackupEntriesCount();
             offHeapAllocatedSize += e.getOffHeapAllocatedSize();
 
+            swapGets += e.getSwapGets();
+            swapPuts += e.getSwapPuts();
+            swapRemoves += e.getSwapRemovals();
+            swapHits += e.getSwapHits();
+            swapMisses += e.getSwapMisses();
+            swapEntriesCnt += e.getSwapEntriesCount();
+            swapSize += e.getSwapSize();
+
             if (e.getDhtEvictQueueCurrentSize() > -1)
-                dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize();
+                dhtEvictQueueCurrSize += e.getDhtEvictQueueCurrentSize();
             else
-                dhtEvictQueueCurrentSize = -1;
+                dhtEvictQueueCurrSize = -1;
 
             txThreadMapSize += e.getTxThreadMapSize();
             txXidMapSize += e.getTxXidMapSize();
             txCommitQueueSize += e.getTxCommitQueueSize();
             txPrepareQueueSize += e.getTxPrepareQueueSize();
-            txStartVersionCountsSize += e.getTxStartVersionCountsSize();
+            txStartVerCountsSize += e.getTxStartVersionCountsSize();
             txCommittedVersionsSize += e.getTxCommittedVersionsSize();
             txRolledbackVersionsSize += e.getTxRolledbackVersionsSize();
 
@@ -336,9 +421,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
                 txDhtPrepareQueueSize = -1;
 
             if (e.getTxDhtStartVersionCountsSize() > -1)
-                txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize();
+                txDhtStartVerCountsSize += e.getTxDhtStartVersionCountsSize();
             else
-                txDhtStartVersionCountsSize = -1;
+                txDhtStartVerCountsSize = -1;
 
             if (e.getTxDhtCommittedVersionsSize() > -1)
                 txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize();
@@ -351,19 +436,19 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
                 txDhtRolledbackVersionsSize = -1;
 
             if (e.getWriteBehindTotalCriticalOverflowCount() > -1)
-                writeBehindTotalCriticalOverflowCount += e.getWriteBehindTotalCriticalOverflowCount();
+                writeBehindTotalCriticalOverflowCnt += e.getWriteBehindTotalCriticalOverflowCount();
             else
-                writeBehindTotalCriticalOverflowCount = -1;
+                writeBehindTotalCriticalOverflowCnt = -1;
 
             if (e.getWriteBehindCriticalOverflowCount() > -1)
-                writeBehindCriticalOverflowCount += e.getWriteBehindCriticalOverflowCount();
+                writeBehindCriticalOverflowCnt += e.getWriteBehindCriticalOverflowCount();
             else
-                writeBehindCriticalOverflowCount = -1;
+                writeBehindCriticalOverflowCnt = -1;
 
             if (e.getWriteBehindErrorRetryCount() > -1)
-                writeBehindErrorRetryCount += e.getWriteBehindErrorRetryCount();
+                writeBehindErrorRetryCnt += e.getWriteBehindErrorRetryCount();
             else
-                writeBehindErrorRetryCount = -1;
+                writeBehindErrorRetryCnt = -1;
         }
 
         int size = metrics.size();
@@ -371,7 +456,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
         if (size > 1) {
             putAvgTimeNanos /= size;
             getAvgTimeNanos /= size;
-            removeAvgTimeNanos /= size;
+            rmvAvgTimeNanos /= size;
             commitAvgTimeNanos /= size;
             rollbackAvgTimeNanos /= size;
         }
@@ -435,7 +520,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
     /** {@inheritDoc} */
     @Override public float getAverageRemoveTime() {
-        return removeAvgTimeNanos;
+        return rmvAvgTimeNanos;
     }
 
     /** {@inheritDoc} */
@@ -469,8 +554,63 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public long getOffHeapGets() {
+        return offHeapGets;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapPuts() {
+        return offHeapPuts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapRemovals() {
+        return offHeapRemoves;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapEvictions() {
+        return offHeapEvicts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapHits() {
+        return offHeapHits;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getOffHeapHitPercentage() {
+        if (offHeapHits == 0 || offHeapGets == 0)
+            return 0;
+
+        return (float) offHeapHits / offHeapGets * 100.0f;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapMisses() {
+        return offHeapMisses;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getOffHeapMissPercentage() {
+        if (offHeapMisses == 0 || offHeapGets == 0)
+            return 0;
+
+        return (float) offHeapMisses / offHeapGets * 100.0f;
+    }
+    /** {@inheritDoc} */
     @Override public long getOffHeapEntriesCount() {
-        return offHeapEntriesCount;
+        return offHeapEntriesCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapPrimaryEntriesCount() {
+        return offHeapPrimaryEntriesCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getOffHeapBackupEntriesCount() {
+        return offHeapBackupEntriesCnt;
     }
 
     /** {@inheritDoc} */
@@ -479,6 +619,62 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public long getOffHeapMaxSize() {
+        return offHeapMaxSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapGets() {
+        return swapGets;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapPuts() {
+        return swapPuts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapRemovals() {
+        return swapRemoves;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapHits() {
+        return swapHits;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapMisses() {
+        return swapMisses;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getSwapHitPercentage() {
+        if (swapHits == 0 || swapGets == 0)
+            return 0;
+
+        return (float) swapHits / swapGets * 100.0f;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getSwapMissPercentage() {
+        if (swapMisses == 0 || swapGets == 0)
+            return 0;
+
+        return (float) swapMisses / swapGets * 100.0f;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapEntriesCount() {
+        return swapEntriesCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getSwapSize() {
+        return swapSize;
+    }
+
+    /** {@inheritDoc} */
     @Override public int getSize() {
         return size;
     }
@@ -495,7 +691,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
     /** {@inheritDoc} */
     @Override public int getDhtEvictQueueCurrentSize() {
-        return dhtEvictQueueCurrentSize;
+        return dhtEvictQueueCurrSize;
     }
 
     /** {@inheritDoc} */
@@ -520,7 +716,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
     /** {@inheritDoc} */
     @Override public int getTxStartVersionCountsSize() {
-        return txStartVersionCountsSize;
+        return txStartVerCountsSize;
     }
 
     /** {@inheritDoc} */
@@ -555,7 +751,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
     /** {@inheritDoc} */
     @Override public int getTxDhtStartVersionCountsSize() {
-        return txDhtStartVersionCountsSize;
+        return txDhtStartVerCountsSize;
     }
 
     /** {@inheritDoc} */
@@ -580,12 +776,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
     /** {@inheritDoc} */
     @Override public int getWriteBehindFlushThreadCount() {
-        return writeBehindFlushThreadCount;
+        return writeBehindFlushThreadCnt;
     }
 
     /** {@inheritDoc} */
     @Override public long getWriteBehindFlushFrequency() {
-        return writeBehindFlushFrequency;
+        return writeBehindFlushFreq;
     }
 
     /** {@inheritDoc} */
@@ -595,22 +791,22 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
     /** {@inheritDoc} */
     @Override public int getWriteBehindTotalCriticalOverflowCount() {
-        return writeBehindTotalCriticalOverflowCount;
+        return writeBehindTotalCriticalOverflowCnt;
     }
 
     /** {@inheritDoc} */
     @Override public int getWriteBehindCriticalOverflowCount() {
-        return writeBehindCriticalOverflowCount;
+        return writeBehindCriticalOverflowCnt;
     }
 
     /** {@inheritDoc} */
     @Override public int getWriteBehindErrorRetryCount() {
-        return writeBehindErrorRetryCount;
+        return writeBehindErrorRetryCnt;
     }
 
     /** {@inheritDoc} */
     @Override public int getWriteBehindBufferSize() {
-        return writeBehindBufferSize;
+        return writeBehindBufSize;
     }
 
     /** {@inheritDoc} */
@@ -620,12 +816,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
     /** {@inheritDoc} */
     @Override public String getValueType() {
-        return valueType;
+        return valType;
     }
 
     /** {@inheritDoc} */
     @Override public boolean isStoreByValue() {
-        return isStoreByValue;
+        return isStoreByVal;
     }
 
     /** {@inheritDoc} */
@@ -666,31 +862,49 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
         out.writeFloat(putAvgTimeNanos);
         out.writeFloat(getAvgTimeNanos);
-        out.writeFloat(removeAvgTimeNanos);
+        out.writeFloat(rmvAvgTimeNanos);
         out.writeFloat(commitAvgTimeNanos);
         out.writeFloat(rollbackAvgTimeNanos);
 
         out.writeLong(overflowSize);
-        out.writeLong(offHeapEntriesCount);
+        out.writeLong(offHeapGets);
+        out.writeLong(offHeapPuts);
+        out.writeLong(offHeapRemoves);
+        out.writeLong(offHeapEvicts);
+        out.writeLong(offHeapHits);
+        out.writeLong(offHeapMisses);
+        out.writeLong(offHeapEntriesCnt);
+        out.writeLong(offHeapPrimaryEntriesCnt);
+        out.writeLong(offHeapBackupEntriesCnt);
         out.writeLong(offHeapAllocatedSize);
-        out.writeInt(dhtEvictQueueCurrentSize);
+        out.writeLong(offHeapMaxSize);
+
+        out.writeLong(swapGets);
+        out.writeLong(swapPuts);
+        out.writeLong(swapRemoves);
+        out.writeLong(swapHits);
+        out.writeLong(swapMisses);
+        out.writeLong(swapEntriesCnt);
+        out.writeLong(swapSize);
+
+        out.writeInt(dhtEvictQueueCurrSize);
         out.writeInt(txThreadMapSize);
         out.writeInt(txXidMapSize);
         out.writeInt(txCommitQueueSize);
         out.writeInt(txPrepareQueueSize);
-        out.writeInt(txStartVersionCountsSize);
+        out.writeInt(txStartVerCountsSize);
         out.writeInt(txCommittedVersionsSize);
         out.writeInt(txRolledbackVersionsSize);
         out.writeInt(txDhtThreadMapSize);
         out.writeInt(txDhtXidMapSize);
         out.writeInt(txDhtCommitQueueSize);
         out.writeInt(txDhtPrepareQueueSize);
-        out.writeInt(txDhtStartVersionCountsSize);
+        out.writeInt(txDhtStartVerCountsSize);
         out.writeInt(txDhtCommittedVersionsSize);
         out.writeInt(txDhtRolledbackVersionsSize);
-        out.writeInt(writeBehindTotalCriticalOverflowCount);
-        out.writeInt(writeBehindCriticalOverflowCount);
-        out.writeInt(writeBehindErrorRetryCount);
+        out.writeInt(writeBehindTotalCriticalOverflowCnt);
+        out.writeInt(writeBehindCriticalOverflowCnt);
+        out.writeInt(writeBehindErrorRetryCnt);
     }
 
     /** {@inheritDoc} */
@@ -706,30 +920,48 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
 
         putAvgTimeNanos = in.readFloat();
         getAvgTimeNanos = in.readFloat();
-        removeAvgTimeNanos = in.readFloat();
+        rmvAvgTimeNanos = in.readFloat();
         commitAvgTimeNanos = in.readFloat();
         rollbackAvgTimeNanos = in.readFloat();
 
         overflowSize = in.readLong();
-        offHeapEntriesCount = in.readLong();
+        offHeapGets = in.readLong();
+        offHeapPuts = in.readLong();
+        offHeapRemoves = in.readLong();
+        offHeapEvicts = in.readLong();
+        offHeapHits = in.readLong();
+        offHeapMisses = in.readLong();
+        offHeapEntriesCnt = in.readLong();
+        offHeapPrimaryEntriesCnt = in.readLong();
+        offHeapBackupEntriesCnt = in.readLong();
         offHeapAllocatedSize = in.readLong();
-        dhtEvictQueueCurrentSize = in.readInt();
+        offHeapMaxSize = in.readLong();
+
+        swapGets = in.readLong();
+        swapPuts = in.readLong();
+        swapRemoves = in.readLong();
+        swapHits = in.readLong();
+        swapMisses = in.readLong();
+        swapEntriesCnt = in.readLong();
+        swapSize = in.readLong();
+
+        dhtEvictQueueCurrSize = in.readInt();
         txThreadMapSize = in.readInt();
         txXidMapSize = in.readInt();
         txCommitQueueSize = in.readInt();
         txPrepareQueueSize = in.readInt();
-        txStartVersionCountsSize = in.readInt();
+        txStartVerCountsSize = in.readInt();
         txCommittedVersionsSize = in.readInt();
         txRolledbackVersionsSize = in.readInt();
         txDhtThreadMapSize = in.readInt();
         txDhtXidMapSize = in.readInt();
         txDhtCommitQueueSize = in.readInt();
         txDhtPrepareQueueSize = in.readInt();
-        txDhtStartVersionCountsSize = in.readInt();
+        txDhtStartVerCountsSize = in.readInt();
         txDhtCommittedVersionsSize = in.readInt();
         txDhtRolledbackVersionsSize = in.readInt();
-        writeBehindTotalCriticalOverflowCount = in.readInt();
-        writeBehindCriticalOverflowCount = in.readInt();
-        writeBehindErrorRetryCount = in.readInt();
+        writeBehindTotalCriticalOverflowCnt = in.readInt();
+        writeBehindCriticalOverflowCnt = in.readInt();
+        writeBehindErrorRetryCnt = in.readInt();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..772e849 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -121,6 +121,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         warnFirstEvict();
 
                     writeToSwap(part, cctx.toCacheKeyObject(kb), vb);
+
+                    if (cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapEvict();
                 }
                 catch (IgniteCheckedException e) {
                     log.error("Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e);
@@ -395,8 +398,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Reconstituted swap entry or {@code null} if entry is obsolete.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException
-    {
+    @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException {
         assert e != null;
 
         checkIteratorQueue();
@@ -425,9 +427,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         int part = cctx.affinity().partition(key);
 
         // First check off-heap store.
-        if (offheapEnabled)
-            if (offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())))
+        if (offheapEnabled) {
+            boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapRead(contains);
+
+            if (contains)
                 return true;
+        }
 
         if (swapEnabled) {
             assert key != null;
@@ -436,6 +444,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())),
                 cctx.deploy().globalLoader());
 
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onSwapRead(valBytes != null);
+
             return valBytes != null;
         }
 
@@ -444,7 +455,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param key Key to read.
-     * @param keyBytes Key bytes.
      * @param part Key partition.
      * @param entryLocked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
@@ -481,6 +491,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             if (readOffheap && offheapEnabled) {
                 byte[] bytes = offheap.get(spaceName, part, key, keyBytes);
 
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRead(bytes != null);
+
                 if (bytes != null)
                     return swapEntry(unmarshalSwapEntry(bytes));
             }
@@ -524,6 +537,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
 
+            if (cctx.config().isStatisticsEnabled()) {
+                if (entryBytes != null)
+                    cctx.cache().metrics0().onOffHeapRemove();
+
+                cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
+            }
+
             if (entryBytes != null) {
                 GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
 
@@ -567,8 +587,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Value from swap or {@code null}.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key,
-        final int part)
+    @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, final int part)
         throws IgniteCheckedException {
         if (!swapEnabled)
             return null;
@@ -582,6 +601,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onSwapRead(rmv != null);
+
                 if (rmv != null) {
                     try {
                         GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
@@ -611,6 +633,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                                 null);
                         }
 
+                        if (cctx.config().isStatisticsEnabled())
+                            cctx.cache().metrics0().onSwapRemove();
+
                         // Always fire this event, since preloading depends on it.
                         onUnswapped(part, key, entry);
 
@@ -649,12 +674,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (!offheapEnabled && !swapEnabled)
             return null;
 
-        return read(entry.key(),
-            entry.key().valueBytes(cctx.cacheObjectContext()),
-            entry.partition(),
-            locked,
-            readOffheap,
-            readSwap);
+        return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
+            readOffheap, readSwap);
     }
 
     /**
@@ -730,6 +751,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         final GridCacheQueryManager qryMgr = cctx.queries();
 
         Collection<SwapKey> unprocessedKeys = null;
+
         final Collection<GridCacheBatchSwapEntry> res = new ArrayList<>(keys.size());
 
         // First try removing from offheap.
@@ -737,8 +759,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             for (KeyCacheObject key : keys) {
                 int part = cctx.affinity().partition(key);
 
-                byte[] entryBytes =
-                    offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                if(entryBytes != null && cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRemove();
 
                 if (entryBytes != null) {
                     GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
@@ -848,6 +872,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                                     null);
                             }
 
+                            if (cctx.config().isStatisticsEnabled())
+                                cctx.cache().metrics0().onSwapRemove();
+
                             // Always fire this event, since preloading depends on it.
                             onUnswapped(swapKey.partition(), key, entry);
 
@@ -880,7 +907,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int part = cctx.affinity().partition(key);
 
-        return offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+        boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+        if(rmv && cctx.config().isStatisticsEnabled())
+            cctx.cache().metrics0().onOffHeapRemove();
+
+        return rmv;
     }
 
     /**
@@ -925,6 +957,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     return;
 
                 try {
+                    if (cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onSwapRemove();
+
                     GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
 
                     if (entry == null)
@@ -942,11 +977,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         // First try offheap.
         if (offheapEnabled) {
-            byte[] val = offheap.remove(spaceName,
-                part,
-                key.value(cctx.cacheObjectContext(), false),
+            byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false),
                 key.valueBytes(cctx.cacheObjectContext()));
 
+            if(val != null && cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapRemove();
+
             if (val != null) {
                 if (c != null)
                     c.apply(val); // Probably we should read value and apply closure before removing...
@@ -1007,6 +1043,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             offheap.put(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()), entry.marshal());
 
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapWrite();
+
             if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP))
                 cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null,
                     EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
@@ -1035,11 +1074,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         if (offheapEnabled) {
             for (GridCacheBatchSwapEntry swapEntry : swapped) {
-                offheap.put(spaceName,
-                    swapEntry.partition(),
-                    swapEntry.key(),
-                    swapEntry.key().valueBytes(cctx.cacheObjectContext()),
-                    swapEntry.marshal());
+                offheap.put(spaceName, swapEntry.partition(), swapEntry.key(),
+                    swapEntry.key().valueBytes(cctx.cacheObjectContext()), swapEntry.marshal());
+
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapWrite();
 
                 if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP))
                     cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
@@ -1071,6 +1110,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         qryMgr.onSwap(batchSwapEntry.key());
                 }
             }
+
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onSwapWrite(batch.size());
         }
     }
 
@@ -1082,17 +1124,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param entry Entry bytes.
      * @throws IgniteCheckedException If failed.
      */
-    private void writeToSwap(int part,
-        KeyCacheObject key,
-        byte[] entry)
-        throws IgniteCheckedException
-    {
+    private void writeToSwap(int part, KeyCacheObject key, byte[] entry) throws IgniteCheckedException {
         checkIteratorQueue();
 
         swapMgr.write(spaceName,
             new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())),
-            entry,
-            cctx.deploy().globalLoader());
+            entry, cctx.deploy().globalLoader());
+
+        if (cctx.config().isStatisticsEnabled())
+            cctx.cache().metrics0().onSwapWrite();
 
         if (cctx.events().isRecordable(EVT_CACHE_OBJECT_SWAPPED))
             cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid) null, null,
@@ -1274,7 +1314,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     int part = cctx.affinity().partition(key);
 
-                    offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                    boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                    if(rmv && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapRemove();
                 }
                 else
                     it.removeX();
@@ -1432,6 +1475,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 return it.hasNext();
             }
 
+            @SuppressWarnings("unchecked")
             @Override protected void onRemove() throws IgniteCheckedException {
                 if (cur == null)
                     throw new IllegalStateException("Method next() has not yet been called, or the remove() method " +
@@ -1616,7 +1660,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     int part = cctx.affinity().partition(key);
 
-                    offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                    boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                    if(rmv && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapRemove();
                 }
 
                 @Override protected void onClose() throws IgniteCheckedException {
@@ -1646,7 +1693,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                 int part = cctx.affinity().partition(key);
 
-                offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                if(rmv && cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRemove();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
index 2ad07b5..5cdc72f 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
@@ -100,14 +100,94 @@ public interface CacheMetricsMXBean extends CacheStatisticsMXBean, CacheMXBean,
     public long getOverflowSize();
 
     /** {@inheritDoc} */
+    @MXBeanDescription("Number of gets from off-heap memory.")
+    public long getOffHeapGets();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of puts to off-heap memory.")
+    public long getOffHeapPuts();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of removed entries from off-heap memory.")
+    public long getOffHeapRemovals();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of evictions from off-heap memory.")
+    public long getOffHeapEvictions();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of hits on off-heap memory.")
+    public long getOffHeapHits();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Percentage of hits on off-heap memory.")
+    public float getOffHeapHitPercentage();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of misses on off-heap memory.")
+    public long getOffHeapMisses();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Percentage of misses on off-heap memory.")
+    public float getOffHeapMissPercentage();
+
+    /** {@inheritDoc} */
     @MXBeanDescription("Number of entries stored in off-heap memory.")
     public long getOffHeapEntriesCount();
 
     /** {@inheritDoc} */
+    @MXBeanDescription("Number of primary entries stored in off-heap memory.")
+    public long getOffHeapPrimaryEntriesCount();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of backup stored in off-heap memory.")
+    public long getOffHeapBackupEntriesCount();
+
+    /** {@inheritDoc} */
     @MXBeanDescription("Memory size allocated in off-heap.")
     public long getOffHeapAllocatedSize();
 
     /** {@inheritDoc} */
+    @MXBeanDescription("Off-heap memory maximum size.")
+    public long getOffHeapMaxSize();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of gets from swap.")
+    public long getSwapGets();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of puts to swap.")
+    public long getSwapPuts();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of removed entries from swap.")
+    public long getSwapRemovals();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of hits on swap.")
+    public long getSwapHits();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of misses on swap.")
+    public long getSwapMisses();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Percentage of hits on swap.")
+    public float getSwapHitPercentage();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Percentage of misses on swap.")
+    public float getSwapMissPercentage();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Number of entries stored in swap.")
+    public long getSwapEntriesCount();
+
+    /** {@inheritDoc} */
+    @MXBeanDescription("Size of swap.")
+    public long getSwapSize();
+
+    /** {@inheritDoc} */
     @MXBeanDescription("Number of non-null values in the cache.")
     public int getSize();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 871512c..b13cc97 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.swapspace.*;
+
 import org.jetbrains.annotations.*;
 
 import javax.management.*;
@@ -453,19 +453,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
         boolean isSpiConsistent = false;
 
-        String tipStr = " (fix configuration or set " + "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)";
+        String tipStr = " (fix configuration or set " +
+            "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)";
 
         if (rmtCls == null) {
             if (!optional && starting)
-                throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr + " [name=" + name +
-                    ", loc=" + locCls + ']');
+                throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr +
+                    " [name=" + name + ", loc=" + locCls + ']');
 
             sb.a(format(">>> Remote SPI with the same name is not configured: " + name, locCls));
         }
         else if (!locCls.equals(rmtCls)) {
             if (!optional && starting)
-                throw new IgniteSpiException("Remote SPI with the same name is of different type" + tipStr + " [name=" + name +
-                    ", loc=" + locCls + ", rmt=" + rmtCls + ']');
+                throw new IgniteSpiException("Remote SPI with the same name is of different type" + tipStr +
+                    " [name=" + name + ", loc=" + locCls + ", rmt=" + rmtCls + ']');
 
             sb.a(format(">>> Remote SPI with the same name is of different type: " + name, locCls, rmtCls));
         }
@@ -627,27 +628,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
-        @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
-            @Nullable ClassLoader ldr) {
-            /* No-op. */
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public int partition(String cacheName, Object key) {
             return -1;
         }
 
         /** {@inheritDoc} */
-        @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
         @Override public Collection<ClusterNode> nodes() {
             return  locNode == null  ? Collections.<ClusterNode>emptyList() : Collections.singletonList(locNode);
         }
@@ -713,12 +698,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
-            @Nullable ClassLoader ldr) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public MessageFormatter messageFormatter() {
             return msgFormatter;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 6852b6d..55f46e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
-import org.apache.ignite.spi.swapspace.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -253,30 +252,6 @@ public interface IgniteSpiContext {
     public <K> boolean containsKey(String cacheName, K key);
 
     /**
-     * Writes object to swap.
-     *
-     * @param spaceName Swap space name.
-     * @param key Key.
-     * @param val Value.
-     * @param ldr Class loader (optional).
-     * @throws IgniteException If any exception occurs.
-     */
-    public void writeToSwap(String spaceName, Object key, @Nullable Object val, @Nullable ClassLoader ldr)
-        throws IgniteException;
-
-    /**
-     * Reads object from swap.
-     *
-     * @param spaceName Swap space name.
-     * @param key Key.
-     * @param ldr Class loader (optional).
-     * @return Swapped value.
-     * @throws IgniteException If any exception occurs.
-     */
-    @Nullable public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr)
-        throws IgniteException;
-
-    /**
      * Calculates partition number for given key.
      *
      * @param cacheName Cache name.
@@ -286,16 +261,6 @@ public interface IgniteSpiContext {
     public int partition(String cacheName, Object key);
 
     /**
-     * Removes object from swap.
-     *
-     * @param spaceName Swap space name.
-     * @param key Key.
-     * @param ldr Class loader (optional).
-     * @throws IgniteException If any exception occurs.
-     */
-    public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) throws IgniteException;
-
-    /**
      * Validates that new node can join grid topology, this method is called on coordinator
      * node before new node joins topology.
      *
@@ -322,18 +287,6 @@ public interface IgniteSpiContext {
     public SecuritySubject authenticatedSubject(UUID subjId) throws IgniteException;
 
     /**
-     * Reads swapped cache value from off-heap and swap.
-     *
-     * @param spaceName Off-heap space name.
-     * @param key Key.
-     * @param ldr Class loader for unmarshalling.
-     * @return Value.
-     * @throws IgniteException If any exception occurs.
-     */
-    @Nullable public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
-        @Nullable ClassLoader ldr) throws IgniteException;
-
-    /**
      * Gets message formatter.
      *
      * @return Message formatter.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index e7db285..7a88426 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -387,15 +387,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         Space space = space(spaceName, false);
 
-        if (space == null)
-            return;
-
-        byte[] val = space.remove(key, c != null);
+        byte[] val = space == null ? null : space.remove(key, c != null);
 
         if (c != null)
             c.apply(val);
 
-        notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
+        if (space != null)
+             notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
     }
 
     /** {@inheritDoc} */


[05/10] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.

Posted by se...@apache.org.
[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8455c7a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8455c7a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8455c7a6

Branch: refs/heads/ignite-943
Commit: 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b
Parents: 3538819
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 15:40:26 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 15:40:26 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/igfs/IgfsUserContext.java | 119 +++++++++++
 .../hadoop/fs/HadoopLazyConcurrentMap.java      | 204 +++++++++++++++++++
 2 files changed, 323 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
new file mode 100644
index 0000000..5a65bdb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Provides ability to execute IGFS code in a context of a specific user.
+ */
+public abstract class IgfsUserContext {
+    /** Thread local to hold the current user context. */
+    private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>();
+
+    /**
+     * Executes given callable in the given user context.
+     * The main contract of this method is that {@link #currentUser()} method invoked
+     * inside closure always returns 'user' this callable executed with.
+     * @param user the user name to invoke closure on behalf of.
+     * @param clo the closure to execute
+     * @param <T> The type of closure result.
+     * @return the result of closure execution.
+     * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+     */
+    public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+        if (F.isEmpty(user))
+            throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+        final String ctxUser = userStackThreadLocal.get();
+
+        if (F.eq(ctxUser, user))
+            return clo.apply(); // correct context is already there
+
+        userStackThreadLocal.set(user);
+
+        try {
+            return clo.apply();
+        }
+        finally {
+            userStackThreadLocal.set(ctxUser);
+        }
+    }
+
+    /**
+     * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts
+     * callable that throws checked Exception.
+     * The Exception is not ever wrapped anyhow.
+     * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is:
+     * <pre name="code" class="java">
+     *  public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
+     *      try {
+     *          return IgfsUserContext.doAs(user, new Callable<Foo>() {
+     *              &#64;Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+     *                  return makeSomeFoo(); // do the job
+     *              }
+     *          });
+     *      }
+     *      catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) {
+     *          throw e;
+     *      }
+     *      catch (Exception e) {
+     *          throw new AssertionError("Must never go there.");
+     *      }
+     *  }
+     * </pre>
+     * @param user the user name to invoke closure on behalf of.
+     * @param clbl the Callable to execute
+     * @param <T> The type of callable result.
+     * @return the result of closure execution.
+     * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+     */
+    public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
+        if (F.isEmpty(user))
+            throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+        final String ctxUser = userStackThreadLocal.get();
+
+        if (F.eq(ctxUser, user))
+            return clbl.call(); // correct context is already there
+
+        userStackThreadLocal.set(user);
+
+        try {
+            return clbl.call();
+        }
+        finally {
+            userStackThreadLocal.set(ctxUser);
+        }
+    }
+
+    /**
+     * Gets the current context user.
+     * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will
+     * return null. Otherwise it will return the user name set in the most lower
+     * {@link #doAs(String, IgniteOutClosure)} call on the call stack.
+     * @return The current user, may be null.
+     */
+    @Nullable public static String currentUser() {
+        return userStackThreadLocal.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
new file mode 100644
index 0000000..71b38c4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ *
+ * Despite of the name, does not depend on any Hadoop classes.
+ */
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
+    /** The map storing the actual values. */
+    private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
+
+    /** The factory passed in by the client. Will be used for lazy value creation. */
+    private final ValueFactory<K, V> factory;
+
+    /** Lock used to close the objects. */
+    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+    /** Flag indicating that this map is closed and cleared. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     * @param factory the factory to create new values lazily.
+     */
+    public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
+        this.factory = factory;
+    }
+
+    /**
+     * Gets cached or creates a new value of V.
+     * Never returns null.
+     * @param k the key to associate the value with.
+     * @return the cached or newly created value, never null.
+     * @throws IgniteException on error
+     */
+    public V getOrCreate(K k) {
+        ValueWrapper w = map.get(k);
+
+        if (w == null) {
+            closeLock.readLock().lock();
+
+            try {
+                if (closed)
+                    throw new IllegalStateException("Failed to create value for key [" + k
+                        + "]: the map is already closed.");
+
+                final ValueWrapper wNew = new ValueWrapper(k);
+
+                w = map.putIfAbsent(k, wNew);
+
+                if (w == null) {
+                    wNew.init();
+
+                    w = wNew;
+                }
+            }
+            finally {
+                closeLock.readLock().unlock();
+            }
+        }
+
+        try {
+            V v = w.getValue();
+
+            assert v != null;
+
+            return v;
+        }
+        catch (IgniteCheckedException ie) {
+            throw new IgniteException(ie);
+        }
+    }
+
+    /**
+     * Clears the map and closes all the values.
+     */
+    public void close() throws IgniteCheckedException {
+        closeLock.writeLock().lock();
+
+        try {
+            closed = true;
+
+            Exception err = null;
+
+            Set<K> keySet = map.keySet();
+
+            for (K key : keySet) {
+                V v = null;
+
+                try {
+                    v = map.get(key).getValue();
+                }
+                catch (IgniteCheckedException ignore) {
+                    // No-op.
+                }
+
+                if (v != null) {
+                    try {
+                        v.close();
+                    }
+                    catch (Exception err0) {
+                        if (err == null)
+                            err = err0;
+                    }
+                }
+            }
+
+            map.clear();
+
+            if (err != null)
+                throw new IgniteCheckedException(err);
+        }
+        finally {
+            closeLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Helper class that drives the lazy value creation.
+     */
+    private class ValueWrapper {
+        /** Future. */
+        private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
+
+        /** the key */
+        private final K key;
+
+        /**
+         * Creates new wrapper.
+         */
+        private ValueWrapper(K key) {
+            this.key = key;
+        }
+
+        /**
+         * Initializes the value using the factory.
+         */
+        private void init() {
+            try {
+                final V v0 = factory.createValue(key);
+
+                if (v0 == null)
+                    throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
+
+                fut.onDone(v0);
+            }
+            catch (Throwable e) {
+                fut.onDone(e);
+            }
+        }
+
+        /**
+         * Gets the available value or blocks until the value is initialized.
+         * @return the value, never null.
+         * @throws IgniteCheckedException on error.
+         */
+        V getValue() throws IgniteCheckedException {
+            return fut.get();
+        }
+    }
+
+    /**
+     * Interface representing the factory that creates map values.
+     * @param <K> the type of the key.
+     * @param <V> the type of the value.
+     */
+    public interface ValueFactory <K, V> {
+        /**
+         * Creates the new value. Should never return null.
+         *
+         * @param key the key to create value for
+         * @return the value.
+         * @throws IgniteException on failure.
+         */
+        public V createValue(K key);
+    }
+}
\ No newline at end of file


[03/10] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 7dca049..f23c62c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
     /** IGFS name. */
     private final String igfs;
 
+    /** The user this out proc is performing on behalf of. */
+    private final String userName;
+
     /** Client log. */
     private final Log log;
 
@@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException {
-        this(host, port, grid, igfs, false, log);
+    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException {
+        this(host, port, grid, igfs, false, log, user);
     }
 
     /**
@@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException {
-        this(null, port, grid, igfs, true, log);
+    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException {
+        this(null, port, grid, igfs, true, log, user);
     }
 
     /**
@@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log)
+    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user)
         throws IOException {
         assert host != null && !shmem || host == null && shmem :
             "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
@@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         this.grid = grid;
         this.igfs = igfs;
         this.log = log;
+        this.userName = IgfsUtils.fixUserName(user);
 
         io = HadoopIgfsIpcIo.get(log, endpoint);
 
@@ -173,6 +177,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
 
         msg.command(INFO);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_RES).get();
     }
@@ -184,6 +189,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(UPDATE);
         msg.path(path);
         msg.properties(props);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_RES).get();
     }
@@ -196,6 +202,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.path(path);
         msg.accessTime(accessTime);
         msg.modificationTime(modificationTime);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -207,6 +214,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(RENAME);
         msg.path(src);
         msg.destinationPath(dest);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -218,6 +226,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(DELETE);
         msg.path(path);
         msg.flag(recursive);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -231,6 +240,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.path(path);
         msg.start(start);
         msg.length(len);
+        msg.userName(userName);
 
         return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
     }
@@ -241,6 +251,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
 
         msg.command(PATH_SUMMARY);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(SUMMARY_RES).get();
     }
@@ -252,6 +263,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(MAKE_DIRECTORIES);
         msg.path(path);
         msg.properties(props);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -262,6 +274,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
 
         msg.command(LIST_FILES);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_COL_RES).get();
     }
@@ -272,6 +285,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
 
         msg.command(LIST_PATHS);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(PATH_COL_RES).get();
     }
@@ -288,6 +302,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(OPEN_READ);
         msg.path(path);
         msg.flag(false);
+        msg.userName(userName);
 
         IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
 
@@ -303,6 +318,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.path(path);
         msg.flag(true);
         msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+        msg.userName(userName);
 
         IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
 
@@ -321,6 +337,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.properties(props);
         msg.replication(replication);
         msg.blockSize(blockSize);
+        msg.userName(userName);
 
         Long streamId = io.send(msg).chain(LONG_RES).get();
 
@@ -336,6 +353,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.path(path);
         msg.flag(create);
         msg.properties(props);
+        msg.userName(userName);
 
         Long streamId = io.send(msg).chain(LONG_RES).get();
 
@@ -471,4 +489,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
             }
         };
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return userName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index 1dada21..7d0db49 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -55,6 +55,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
     /** Logger. */
     private final Log log;
 
+    /** The user name this wrapper works on behalf of. */
+    private final String userName;
+
     /**
      * Constructor.
      *
@@ -63,13 +66,15 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
      * @param conf Configuration.
      * @param log Current logger.
      */
-    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
+            throws IOException {
         try {
             this.authority = authority;
             this.endpoint = new HadoopIgfsEndpoint(authority);
             this.logDir = logDir;
             this.conf = conf;
             this.log = log;
+            this.userName = user;
         }
         catch (IgniteCheckedException e) {
             throw new IOException("Failed to parse endpoint: " + authority, e);
@@ -362,13 +367,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 HadoopIgfsEx hadoop = null;
 
                 try {
-                    hadoop = new HadoopIgfsInProc(igfs, log);
+                    hadoop = new HadoopIgfsInProc(igfs, log, userName);
 
                     curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
                 }
                 catch (IOException | IgniteCheckedException e) {
                     if (e instanceof HadoopIgfsCommunicationException)
-                        hadoop.close(true);
+                        if (hadoop != null)
+                            hadoop.close(true);
 
                     if (log.isDebugEnabled())
                         log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e);
@@ -384,7 +390,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 HadoopIgfsEx hadoop = null;
 
                 try {
-                    hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+                    hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
 
                     curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
                 }
@@ -409,7 +415,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
 
                 try {
                     hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
-                        log);
+                        log, userName);
 
                     curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
                 }
@@ -430,7 +436,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
             HadoopIgfsEx hadoop = null;
 
             try {
-                hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+                hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                    log, userName);
 
                 curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index e9c859bd..dd18c66 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -239,9 +239,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
 
         try {
-            FileSystem fs = FileSystem.get(jobConf());
-
-            HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+            FileSystem.get(jobConf());
 
             LocalFileSystem locFs = FileSystem.getLocal(jobConf());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
index d11cabb..9bcd5de 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
@@ -39,6 +40,7 @@ import org.jsr166.*;
 
 import java.io.*;
 import java.net.*;
+import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -58,6 +60,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
     /** Thread count for multithreaded tests. */
     private static final int THREAD_CNT = 8;
 
+    /** Secondary file system user. */
+    private static final String SECONDARY_FS_USER = "secondary-default";
+
     /** IP finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
@@ -255,7 +260,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         if (mode != PRIMARY)
             cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(secondaryFileSystemUriPath(),
-                secondaryFileSystemConfigPath()));
+                secondaryFileSystemConfigPath(), SECONDARY_FS_USER));
 
         cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
         cfg.setManagementPort(-1);
@@ -278,11 +283,28 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         primaryFsCfg.addResource(U.resolveIgniteUrl(primaryFileSystemConfigPath()));
 
-        fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+        UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, getClientFsUser());
+
+        // Create Fs on behalf of the client user:
+        ugi.doAs(new PrivilegedExceptionAction<Object>() {
+            @Override public Object run() throws Exception {
+                fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+
+                return null;
+            }
+        });
 
         barrier = new CyclicBarrier(THREAD_CNT);
     }
 
+    /**
+     * Gets the user the Fs client operates on bahalf of.
+     * @return The user the Fs client operates on bahalf of.
+     */
+    protected String getClientFsUser() {
+        return "foo";
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         try {
@@ -297,14 +319,17 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
     /** @throws Exception If failed. */
     public void testStatus() throws Exception {
+        Path file1 = new Path("/file1");
 
-        try (FSDataOutputStream file = fs.create(new Path("/file1"), EnumSet.noneOf(CreateFlag.class),
+        try (FSDataOutputStream file = fs.create(file1, EnumSet.noneOf(CreateFlag.class),
             Options.CreateOpts.perms(FsPermission.getDefault()))) {
             file.write(new byte[1024 * 1024]);
         }
 
         FsStatus status = fs.getFsStatus();
 
+        assertEquals(getClientFsUser(), fs.getFileStatus(file1).getOwner());
+
         assertEquals(4, grid(0).cluster().nodes().size());
 
         long used = 0, max = 0;
@@ -707,6 +732,8 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         os.close();
 
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
         fs.setOwner(file, "aUser", "aGroup");
 
         assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -796,20 +823,20 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         int cnt = 2 * 1024;
 
-        FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class),
-            Options.CreateOpts.perms(FsPermission.getDefault()));
+        try (FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class),
+            Options.CreateOpts.perms(FsPermission.getDefault()))) {
 
-        for (long i = 0; i < cnt; i++)
-            out.writeLong(i);
+            for (long i = 0; i < cnt; i++)
+                out.writeLong(i);
+        }
 
-        out.close();
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
 
-        FSDataInputStream in = fs.open(file, 1024);
+        try (FSDataInputStream in = fs.open(file, 1024)) {
 
-        for (long i = 0; i < cnt; i++)
-            assertEquals(i, in.readLong());
-
-        in.close();
+            for (long i = 0; i < cnt; i++)
+                assertEquals(i, in.readLong());
+        }
     }
 
     /** @throws Exception If failed. */
@@ -1191,6 +1218,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
         assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission());
+
+        assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+        assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner());
     }
 
     /** @throws Exception If failed. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 9e84c51..b089995 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
             primaryConfFullPath = null;
 
         SecondaryFileSystemProvider provider =
-            new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath, null);
+            new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
 
-        primaryFs = provider.createFileSystem();
+        primaryFs = provider.createFileSystem(null);
 
         primaryFsUri = provider.uri();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index d9a3c59..b828aad 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
@@ -43,6 +44,7 @@ import org.jsr166.*;
 import java.io.*;
 import java.lang.reflect.*;
 import java.net.*;
+import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -72,6 +74,9 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
     /** Secondary file system configuration path. */
     private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml";
 
+    /** Secondary file system user. */
+    private static final String SECONDARY_FS_USER = "secondary-default";
+
     /** Secondary endpoint configuration. */
     protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG;
 
@@ -145,6 +150,14 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
         endpoint = skipLocShmem ? "127.0.0.1:10500" : "shmem:10500";
     }
 
+    /**
+     * Gets the user the Fs client operates on bahalf of.
+     * @return The user the Fs client operates on bahalf of.
+     */
+    protected String getClientFsUser() {
+        return "foo";
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         Configuration secondaryConf = configuration(SECONDARY_AUTHORITY, true, true);
@@ -235,7 +248,17 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         primaryFsCfg = configuration(PRIMARY_AUTHORITY, skipEmbed, skipLocShmem);
 
-        fs = FileSystem.get(primaryFsUri, primaryFsCfg);
+        UserGroupInformation clientUgi = UserGroupInformation.getBestUGI(null, getClientFsUser());
+        assertNotNull(clientUgi);
+
+        // Create the Fs on behalf of the specific user:
+        clientUgi.doAs(new PrivilegedExceptionAction<Object>() {
+            @Override public Object run() throws Exception {
+                fs = FileSystem.get(primaryFsUri, primaryFsCfg);
+
+                return null;
+            }
+        });
 
         barrier = new CyclicBarrier(THREAD_CNT);
     }
@@ -324,7 +347,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
         cfg.setDefaultMode(mode);
 
         if (mode != PRIMARY)
-            cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG_PATH));
+            cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
+                SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER));
 
         cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
 
@@ -870,6 +894,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         os.close();
 
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
         fs.setOwner(file, "aUser", "aGroup");
 
         assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -1001,19 +1027,19 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         int cnt = 2 * 1024;
 
-        FSDataOutputStream out = fs.create(file, true, 1024);
-
-        for (long i = 0; i < cnt; i++)
-            out.writeLong(i);
+        try (FSDataOutputStream out = fs.create(file, true, 1024)) {
 
-        out.close();
+            for (long i = 0; i < cnt; i++)
+                out.writeLong(i);
+        }
 
-        FSDataInputStream in = fs.open(file, 1024);
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
 
-        for (long i = 0; i < cnt; i++)
-            assertEquals(i, in.readLong());
+        try (FSDataInputStream in = fs.open(file, 1024)) {
 
-        in.close();
+            for (long i = 0; i < cnt; i++)
+                assertEquals(i, in.readLong());
+        }
     }
 
     /** @throws Exception If failed. */
@@ -1344,7 +1370,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         String path = fs.getFileStatus(file).getPath().toString();
 
-        assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous") + "/file"));
+        assertTrue(path.endsWith("/user/" + getClientFsUser() + "/file"));
     }
 
     /** @throws Exception If failed. */
@@ -1374,7 +1400,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
     public void testGetWorkingDirectoryIfDefault() throws Exception {
         String path = fs.getWorkingDirectory().toString();
 
-        assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous")));
+        assertTrue(path.endsWith("/user/" + getClientFsUser()));
     }
 
     /** @throws Exception If failed. */
@@ -1412,17 +1438,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
     @SuppressWarnings("OctalInteger")
     public void testMkdirs() throws Exception {
         Path fsHome = new Path(PRIMARY_URI);
-        Path dir = new Path(fsHome, "/tmp/staging");
-        Path nestedDir = new Path(dir, "nested");
+        final Path dir = new Path(fsHome, "/tmp/staging");
+        final Path nestedDir = new Path(dir, "nested");
 
-        FsPermission dirPerm = FsPermission.createImmutable((short)0700);
-        FsPermission nestedDirPerm = FsPermission.createImmutable((short)111);
+        final FsPermission dirPerm = FsPermission.createImmutable((short)0700);
+        final FsPermission nestedDirPerm = FsPermission.createImmutable((short)111);
 
         assertTrue(fs.mkdirs(dir, dirPerm));
         assertTrue(fs.mkdirs(nestedDir, nestedDirPerm));
 
         assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
         assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission());
+
+        assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+        assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner());
     }
 
     /** @throws Exception If failed. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
index b92b213..fcfd587 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
@@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest
         try {
             switchHandlerErrorFlag(true);
 
-            HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG);
+            HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null);
 
             client.handshake(null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
index e103c5f..2c17ba9 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
@@ -144,6 +144,8 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe
 
         Map<String, HadoopIgfsIpcIo> cache = (Map<String, HadoopIgfsIpcIo>)cacheField.get(null);
 
+        cache.clear(); // avoid influence of previous tests in the same process.
+
         String name = "igfs:" + getTestGridName(0) + "@";
 
         Configuration cfg = new Configuration();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
index 8cf31a2..5f90bd4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.*;
  * Test file systems for the working directory multi-threading support.
  */
 public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
+    /** the number of threads */
     private static final int THREAD_COUNT = 3;
 
     /** {@inheritDoc} */
@@ -87,10 +88,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
                 try {
                     int curThreadNum = threadNum.getAndIncrement();
 
-                    FileSystem fs = FileSystem.get(uri, cfg);
-
-                    HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum);
-
                     if ("file".equals(uri.getScheme()))
                         FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum));
 
@@ -149,24 +146,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
     }
 
     /**
-     * Test IGFS multi-thread working directory.
-     *
-     * @throws Exception If fails.
-     */
-    public void testIgfs() throws Exception {
-        testFileSystem(URI.create(igfsScheme()));
-    }
-
-    /**
-     * Test HDFS multi-thread working directory.
-     *
-     * @throws Exception If fails.
-     */
-    public void testHdfs() throws Exception {
-        testFileSystem(URI.create("hdfs://localhost/"));
-    }
-
-    /**
      * Test LocalFS multi-thread working directory.
      *
      * @throws Exception If fails.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
index 8a046e0..89bf830 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
@@ -61,10 +61,10 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
 
             int sigma = max((int)ceil(precission * exp), 5);
 
-            X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission +
+            X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission +
                 " sigma: " + sigma);
 
-            assertTrue(abs(exp - levelsCnts[level]) <= sigma);
+            assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails.
         }
     }