You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/29 17:58:53 UTC
[01/10] incubator-ignite git commit: ignite-37 Improve offheap
metrics for cache
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-943 d10fe3e90 -> d10120d67
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
new file mode 100644
index 0000000..3d44600
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/CacheLocalOffHeapAndSwapMetricsSelfTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.local;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.eviction.fifo.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.swapspace.file.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class CacheLocalOffHeapAndSwapMetricsSelfTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 1;
+
+ /** Keys count. */
+ private static final int KEYS_CNT = 1000;
+
+ /** Max size. */
+ private static final int MAX_SIZE = 100;
+
+ /** Entry size. */
+ private static final int ENTRY_SIZE = 86; // Calculated as allocated size divided on entries count.
+
+ /** Offheap max count. */
+ private static final int OFFHEAP_MAX_CNT = KEYS_CNT / 2;
+
+ /** Offheap max size. */
+ private static final int OFFHEAP_MAX_SIZE = ENTRY_SIZE * OFFHEAP_MAX_CNT;
+
+ /** Cache. */
+ private IgniteCache<Integer, Integer> cache;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+
+ return cfg;
+ }
+
+ /**
+ * @param offHeapSize Max off-heap size.
+ * @param swapEnabled Swap enabled.
+ */
+ private void createCache(int offHeapSize, boolean swapEnabled) {
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+
+ ccfg.setStatisticsEnabled(true);
+
+ ccfg.setCacheMode(CacheMode.LOCAL);
+ ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+ ccfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);
+
+ ccfg.setOffHeapMaxMemory(offHeapSize);
+ ccfg.setSwapEnabled(swapEnabled);
+
+ ccfg.setEvictionPolicy(new FifoEvictionPolicy(MAX_SIZE));
+
+ cache = grid(0).getOrCreateCache(ccfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(GRID_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ if (cache != null)
+ cache.close();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testOffHeapMetrics() throws Exception {
+ createCache(0, false);
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(i, i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapGets());
+ assertEquals(0, cache.metrics().getOffHeapHits());
+ assertEquals(0f, cache.metrics().getOffHeapHitPercentage());
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapMisses());
+ assertEquals(100f, cache.metrics().getOffHeapMissPercentage());
+ assertEquals(0, cache.metrics().getOffHeapRemovals());
+
+ assertEquals(0, cache.metrics().getOffHeapEvictions());
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapEntriesCount());
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPrimaryEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.get(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+ assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapGets());
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapHits());
+ assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getOffHeapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapMisses());
+ assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getOffHeapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapRemovals());
+
+ assertEquals(0, cache.metrics().getOffHeapEvictions());
+ assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getOffHeapEntriesCount());
+ assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getOffHeapPrimaryEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+ for (int i = KEYS_CNT; i < KEYS_CNT * 2; i++)
+ cache.get(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+ assertEquals(KEYS_CNT * 3, cache.metrics().getOffHeapGets());
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapHits());
+ assertEquals(100 / 3.0, cache.metrics().getOffHeapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapMisses());
+ assertEquals(100 - (100 / 3.0), cache.metrics().getOffHeapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapRemovals());
+
+ assertEquals(0, cache.metrics().getOffHeapEvictions());
+ assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getOffHeapEntriesCount());
+ assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getOffHeapPrimaryEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.remove(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+ assertEquals(KEYS_CNT * 4 - MAX_SIZE, cache.metrics().getOffHeapGets());
+ assertEquals(KEYS_CNT * 2 - MAX_SIZE, cache.metrics().getOffHeapHits());
+ assertEquals(100 * (KEYS_CNT * 2.0 - MAX_SIZE) / (KEYS_CNT * 4.0 - MAX_SIZE),
+ cache.metrics().getOffHeapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapMisses());
+ assertEquals(100 * KEYS_CNT * 2.0 / (KEYS_CNT * 4.0 - MAX_SIZE),
+ cache.metrics().getOffHeapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2 - MAX_SIZE, cache.metrics().getOffHeapRemovals());
+
+ assertEquals(0, cache.metrics().getOffHeapEvictions());
+ assertEquals(0, cache.metrics().getOffHeapEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapPrimaryEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testSwapMetrics() throws Exception {
+ createCache(-1, true);
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(i, i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapPuts());
+ assertEquals(KEYS_CNT, cache.metrics().getSwapGets());
+ assertEquals(0, cache.metrics().getSwapHits());
+ assertEquals(0f, cache.metrics().getSwapHitPercentage());
+ assertEquals(KEYS_CNT, cache.metrics().getSwapMisses());
+ assertEquals(100f, cache.metrics().getSwapMissPercentage());
+ assertEquals(0, cache.metrics().getSwapRemovals());
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapEntriesCount());
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.get(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapPuts());
+ assertEquals(KEYS_CNT * 2, cache.metrics().getSwapGets());
+ assertEquals(KEYS_CNT, cache.metrics().getSwapHits());
+ assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getSwapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getSwapMisses());
+ assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getSwapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getSwapRemovals());
+
+ assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getSwapEntriesCount());
+
+ for (int i = KEYS_CNT; i < KEYS_CNT * 2; i++)
+ cache.get(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapPuts());
+ assertEquals(KEYS_CNT * 3, cache.metrics().getSwapGets());
+ assertEquals(KEYS_CNT, cache.metrics().getSwapHits());
+ assertEquals(100 / 3.0, cache.metrics().getSwapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2, cache.metrics().getSwapMisses());
+ assertEquals(100 - (100 / 3.0), cache.metrics().getSwapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getSwapRemovals());
+
+ assertEquals(KEYS_CNT - MAX_SIZE, cache.metrics().getSwapEntriesCount());
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.remove(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getSwapPuts());
+ assertEquals(KEYS_CNT * 4 - MAX_SIZE, cache.metrics().getSwapGets());
+ assertEquals(KEYS_CNT * 2 - MAX_SIZE, cache.metrics().getSwapHits());
+ assertEquals(100 * (KEYS_CNT * 2.0 - MAX_SIZE) / (KEYS_CNT * 4.0 - MAX_SIZE),
+ cache.metrics().getSwapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2, cache.metrics().getSwapMisses());
+ assertEquals(100 * KEYS_CNT * 2.0 / (KEYS_CNT * 4.0 - MAX_SIZE),
+ cache.metrics().getSwapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2 - MAX_SIZE, cache.metrics().getSwapRemovals());
+
+ assertEquals(0, cache.metrics().getSwapEntriesCount());
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testOffHeapAndSwapMetrics() throws Exception {
+ createCache(OFFHEAP_MAX_SIZE, true);
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(i, i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapGets());
+ assertEquals(0, cache.metrics().getOffHeapHits());
+ assertEquals(0f, cache.metrics().getOffHeapHitPercentage());
+ assertEquals(KEYS_CNT, cache.metrics().getOffHeapMisses());
+ assertEquals(100f, cache.metrics().getOffHeapMissPercentage());
+ assertEquals(0, cache.metrics().getOffHeapRemovals());
+
+ assertEquals(KEYS_CNT - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEvictions());
+ assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEntriesCount());
+ assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapPrimaryEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+ assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapPuts());
+ assertEquals(KEYS_CNT, cache.metrics().getSwapGets());
+ assertEquals(0, cache.metrics().getSwapHits());
+ assertEquals(0f, cache.metrics().getSwapHitPercentage());
+ assertEquals(KEYS_CNT, cache.metrics().getSwapMisses());
+ assertEquals(100f, cache.metrics().getSwapMissPercentage());
+ assertEquals(0, cache.metrics().getSwapRemovals());
+
+ assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapEntriesCount());
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.get(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+ assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapGets());
+ assertEquals(0, cache.metrics().getOffHeapHits());
+ assertEquals(0.0, cache.metrics().getOffHeapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2, cache.metrics().getOffHeapMisses());
+ assertEquals(100.0, cache.metrics().getOffHeapMissPercentage(), 0.1);
+ assertEquals(0, cache.metrics().getOffHeapRemovals());
+
+ assertEquals(cache.metrics().getCacheEvictions() - OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEvictions());
+ assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEntriesCount());
+ assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapPrimaryEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+ assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapPuts());
+ assertEquals(KEYS_CNT * 2, cache.metrics().getSwapGets());
+ assertEquals(KEYS_CNT, cache.metrics().getSwapHits());
+ assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getSwapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getSwapMisses());
+ assertEquals(100 * KEYS_CNT / (KEYS_CNT * 2.0), cache.metrics().getSwapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getSwapRemovals());
+
+ assertEquals(KEYS_CNT - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapEntriesCount());
+
+ for (int i = KEYS_CNT; i < KEYS_CNT * 2; i++)
+ cache.get(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+ assertEquals(KEYS_CNT * 3, cache.metrics().getOffHeapGets());
+ assertEquals(0, cache.metrics().getOffHeapHits());
+ assertEquals(0.0, cache.metrics().getOffHeapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 3, cache.metrics().getOffHeapMisses());
+ assertEquals(100.0, cache.metrics().getOffHeapMissPercentage(), 0.1);
+ assertEquals(0, cache.metrics().getOffHeapRemovals());
+
+ assertEquals(cache.metrics().getCacheEvictions() - OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEvictions());
+ assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEntriesCount());
+ assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapPrimaryEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+ assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapPuts());
+ assertEquals(KEYS_CNT * 3, cache.metrics().getSwapGets());
+ assertEquals(KEYS_CNT, cache.metrics().getSwapHits());
+ assertEquals(100 / 3.0, cache.metrics().getSwapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2, cache.metrics().getSwapMisses());
+ assertEquals(100 - (100 / 3.0), cache.metrics().getSwapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT, cache.metrics().getSwapRemovals());
+
+ assertEquals(KEYS_CNT - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapEntriesCount());
+
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.remove(i);
+
+ printStat();
+
+ assertEquals(cache.metrics().getCacheEvictions(), cache.metrics().getOffHeapPuts());
+ assertEquals(KEYS_CNT * 4 - MAX_SIZE, cache.metrics().getOffHeapGets());
+ assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapHits());
+ assertEquals(100 * OFFHEAP_MAX_CNT / (KEYS_CNT * 4.0 - MAX_SIZE),
+ cache.metrics().getOffHeapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 4 - OFFHEAP_MAX_CNT - MAX_SIZE, cache.metrics().getOffHeapMisses());
+ assertEquals(100 * (KEYS_CNT * 4 - OFFHEAP_MAX_CNT - MAX_SIZE) / (KEYS_CNT * 4.0 - MAX_SIZE),
+ cache.metrics().getOffHeapMissPercentage(), 0.1);
+ assertEquals(OFFHEAP_MAX_CNT, cache.metrics().getOffHeapRemovals());
+
+ assertEquals(cache.metrics().getCacheEvictions() - OFFHEAP_MAX_CNT, cache.metrics().getOffHeapEvictions());
+ assertEquals(0, cache.metrics().getOffHeapEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapPrimaryEntriesCount());
+ assertEquals(0, cache.metrics().getOffHeapBackupEntriesCount());
+
+ assertEquals(cache.metrics().getOffHeapEvictions(), cache.metrics().getSwapPuts());
+ assertEquals(KEYS_CNT * 4 - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapGets());
+ assertEquals(KEYS_CNT * 2 - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapHits());
+ assertEquals(100 * (KEYS_CNT * 2.0 - MAX_SIZE - OFFHEAP_MAX_CNT) / (KEYS_CNT * 4.0 - MAX_SIZE - OFFHEAP_MAX_CNT),
+ cache.metrics().getSwapHitPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2, cache.metrics().getSwapMisses());
+ assertEquals(100 * KEYS_CNT * 2.0 / (KEYS_CNT * 4.0 - MAX_SIZE - OFFHEAP_MAX_CNT),
+ cache.metrics().getSwapMissPercentage(), 0.1);
+ assertEquals(KEYS_CNT * 2 - MAX_SIZE - OFFHEAP_MAX_CNT, cache.metrics().getSwapRemovals());
+
+ assertEquals(0, cache.metrics().getSwapEntriesCount());
+ }
+
+ /**
+ * Prints stats.
+ */
+ protected void printStat() {
+ System.out.println("!!! -------------------------------------------------------");
+ System.out.println("!!! Puts: cache = " + cache.metrics().getCachePuts() +
+ ", offheap = " + cache.metrics().getOffHeapPuts() +
+ ", swap = " + cache.metrics().getSwapPuts());
+ System.out.println("!!! Gets: cache = " + cache.metrics().getCacheGets() +
+ ", offheap = " + cache.metrics().getOffHeapGets() +
+ ", swap = " + cache.metrics().getSwapGets());
+ System.out.println("!!! Removes: cache = " + cache.metrics().getCacheRemovals() +
+ ", offheap = " + cache.metrics().getOffHeapRemovals() +
+ ", swap = " + cache.metrics().getSwapRemovals());
+ System.out.println("!!! Evictions: cache = " + cache.metrics().getCacheEvictions() +
+ ", offheap = " + cache.metrics().getOffHeapEvictions() +
+ ", swap = none" );
+ System.out.println("!!! Hits: cache = " + cache.metrics().getCacheHits() +
+ ", offheap = " + cache.metrics().getOffHeapHits() +
+ ", swap = " + cache.metrics().getSwapHits());
+ System.out.println("!!! Hit(%): cache = " + cache.metrics().getCacheHitPercentage() +
+ ", offheap = " + cache.metrics().getOffHeapHitPercentage() +
+ ", swap = " + cache.metrics().getSwapHitPercentage());
+ System.out.println("!!! Misses: cache = " + cache.metrics().getCacheMisses() +
+ ", offheap = " + cache.metrics().getOffHeapMisses() +
+ ", swap = " + cache.metrics().getSwapMisses());
+ System.out.println("!!! Miss(%): cache = " + cache.metrics().getCacheMissPercentage() +
+ ", offheap = " + cache.metrics().getOffHeapMissPercentage() +
+ ", swap = " + cache.metrics().getSwapMissPercentage());
+ System.out.println("!!! Entries: cache = " + cache.metrics().getSize() +
+ ", offheap = " + cache.metrics().getOffHeapEntriesCount() +
+ ", swap = " + cache.metrics().getSwapEntriesCount());
+ System.out.println("!!! Size: cache = none" +
+ ", offheap = " + cache.metrics().getOffHeapAllocatedSize() +
+ ", swap = " + cache.metrics().getSwapSize());
+ System.out.println();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index bc04f90..5867fb8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,7 +29,7 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.swapspace.*;
+
import org.jetbrains.annotations.*;
import java.io.*;
@@ -447,28 +447,11 @@ public class GridSpiTestContext implements IgniteSpiContext {
}
/** {@inheritDoc} */
- @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
- @Nullable ClassLoader ldr) {
- /* No-op. */
- }
-
- /** {@inheritDoc} */
- @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public int partition(String cacheName, Object key) {
return -1;
}
/** {@inheritDoc} */
- @Override public void removeFromSwap(String spaceName, Object key,
- @Nullable ClassLoader ldr) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
return null;
}
@@ -484,12 +467,6 @@ public class GridSpiTestContext implements IgniteSpiContext {
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
- @Nullable ClassLoader ldr) {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public MessageFormatter messageFormatter() {
if (formatter == null) {
formatter = new MessageFormatter() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
index 1adf55f..9a0e5fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
@@ -39,6 +39,7 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheReplicatedMetricsSelfTest.class);
suite.addTestSuite(GridCachePartitionedMetricsSelfTest.class);
suite.addTestSuite(GridCachePartitionedHitsAndMissesSelfTest.class);
+ suite.addTestSuite(CacheLocalOffHeapAndSwapMetricsSelfTest.class);
// Atomic cache.
suite.addTestSuite(GridCacheAtomicLocalMetricsSelfTest.class);
[09/10] incubator-ignite git commit: # regenerated PDF
Posted by se...@apache.org.
# regenerated PDF
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5c30f9cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5c30f9cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5c30f9cf
Branch: refs/heads/ignite-943
Commit: 5c30f9cf5c490b0d6c065e89557a8b8f3040eda5
Parents: 6cd1a6e 3f012b7
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 29 18:42:16 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 29 18:42:16 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/CacheMetrics.java | 187 +++++++--
.../internal/managers/GridManagerAdapter.java | 59 +--
.../processors/cache/CacheMetricsImpl.java | 305 +++++++++++++-
.../cache/CacheMetricsMXBeanImpl.java | 100 +++++
.../processors/cache/CacheMetricsSnapshot.java | 380 +++++++++++++----
.../processors/cache/GridCacheSwapManager.java | 118 ++++--
.../ignite/mxbean/CacheMetricsMXBean.java | 80 ++++
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 35 +-
.../org/apache/ignite/spi/IgniteSpiContext.java | 47 ---
.../spi/swapspace/file/FileSwapSpaceSpi.java | 8 +-
...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 412 +++++++++++++++++++
.../testframework/GridSpiTestContext.java | 25 +-
.../IgniteCacheMetricsSelfTestSuite.java | 1 +
13 files changed, 1457 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5c30f9cf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index af19077,3dcda3c..4e6a447
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@@ -129,24 -161,162 +164,168 @@@ public class CacheMetricsImpl implement
}
/** {@inheritDoc} */
+ @Override public long getOffHeapGets() {
+ return offHeapGets.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPuts() {
+ return offHeapPuts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapRemovals() {
+ return offHeapRemoves.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapEvictions() {
+ return offHeapEvicts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapHits() {
+ return offHeapHits.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapHitPercentage() {
+ long hits0 = offHeapHits.get();
+ long gets0 = offHeapGets.get();
+
+ if (hits0 == 0)
+ return 0;
+
+ return (float) hits0 / gets0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapMisses() {
+ return offHeapMisses.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapMissPercentage() {
+ long misses0 = offHeapMisses.get();
+ long reads0 = offHeapGets.get();
+
+ if (misses0 == 0)
+ return 0;
+
+ return (float) misses0 / reads0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
- return cctx.cache().offHeapEntriesCount();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.offHeapEntriesCount() : -1;
}
/** {@inheritDoc} */
+ @Override public long getOffHeapPrimaryEntriesCount() {
+ try {
+ return cctx.swap().offheapEntriesCount(true, false, NONE);
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapBackupEntriesCount() {
+ try {
+ return cctx.swap().offheapEntriesCount(false, true, NONE);
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapAllocatedSize() {
- return cctx.cache().offHeapAllocatedSize();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.offHeapAllocatedSize() : -1;
}
/** {@inheritDoc} */
+ @Override public long getOffHeapMaxSize() {
+ return cctx.config().getOffHeapMaxMemory();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapGets() {
+ return swapGets.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapPuts() {
+ return swapPuts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapRemovals() {
+ return swapRemoves.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapHits() {
+ return swapHits.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapMisses() {
+ return swapMisses.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapEntriesCount() {
+ try {
+ return cctx.cache().swapKeys();
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapSize() {
+ try {
+ return cctx.cache().swapSize();
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapHitPercentage() {
+ long hits0 = swapHits.get();
+ long gets0 = swapGets.get();
+
+ if (hits0 == 0)
+ return 0;
+
+ return (float) hits0 / gets0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapMissPercentage() {
+ long misses0 = swapMisses.get();
+ long reads0 = swapGets.get();
+
+ if (misses0 == 0)
+ return 0;
+
+ return (float) misses0 / reads0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
@Override public int getSize() {
- return cctx.cache().size();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.size() : 0;
}
/** {@inheritDoc} */
@@@ -606,11 -769,111 +797,113 @@@
/** {@inheritDoc} */
@Override public boolean isManagementEnabled() {
- return cctx.config().isManagementEnabled();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isManagementEnabled();
}
+ /**
+ * Off-heap read callback.
+ *
+ * @param hit Hit or miss flag.
+ */
+ public void onOffHeapRead(boolean hit) {
+ offHeapGets.incrementAndGet();
+
+ if (hit)
+ offHeapHits.incrementAndGet();
+ else
+ offHeapMisses.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRead(hit);
+ }
+
+ /**
+ * Off-heap write callback.
+ */
+ public void onOffHeapWrite() {
+ offHeapPuts.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapWrite();
+ }
+
+ /**
+ * Off-heap remove callback.
+ */
+ public void onOffHeapRemove() {
+ offHeapRemoves.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRemove();
+ }
+
+ /**
+ * Off-heap evict callback.
+ */
+ public void onOffHeapEvict() {
+ offHeapEvicts.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRemove();
+ }
+
+ /**
+ * Swap read callback.
+ *
+ * @param hit Hit or miss flag.
+ */
+ public void onSwapRead(boolean hit) {
+ swapGets.incrementAndGet();
+
+ if (hit)
+ swapHits.incrementAndGet();
+ else
+ swapMisses.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onSwapRead(hit);
+ }
+
+ /**
+ * Swap write callback.
+ */
+ public void onSwapWrite() {
+ onSwapWrite(1);
+ }
+
+ /**
+ * Swap write callback.
+ *
+ * @param cnt Amount of entries.
+ */
+ public void onSwapWrite(int cnt) {
+ swapPuts.addAndGet(cnt);
+
+ if (delegate != null)
+ delegate.onSwapWrite(cnt);
+ }
+
+ /**
+ * Swap remove callback.
+ */
+ public void onSwapRemove() {
+ onSwapRemove(1);
+ }
+
+ /**
+ * Swap remove callback.
+ *
+ * @param cnt Amount of entries.
+ */
+ public void onSwapRemove(int cnt) {
+ swapRemoves.addAndGet(cnt);
+
+ if (delegate != null)
+ delegate.onSwapRemove(cnt);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsImpl.class, this);
[10/10] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-5' into ignite-943
Posted by se...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-5' into ignite-943
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d10120d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d10120d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d10120d6
Branch: refs/heads/ignite-943
Commit: d10120d67d3d6e20b25e494b03b829b20f263b5b
Parents: d10fe3e 5c30f9c
Author: sevdokimov <se...@gridgain.com>
Authored: Fri May 29 18:57:20 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Fri May 29 18:57:20 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/CacheMetrics.java | 187 +++++++--
.../org/apache/ignite/igfs/IgfsUserContext.java | 119 ++++++
.../igfs/secondary/IgfsSecondaryFileSystem.java | 7 +
.../internal/igfs/common/IgfsMarshaller.java | 35 +-
.../igfs/common/IgfsPathControlRequest.java | 22 +
.../internal/managers/GridManagerAdapter.java | 59 +--
.../processors/cache/CacheMetricsImpl.java | 367 ++++++++++++++++-
.../cache/CacheMetricsMXBeanImpl.java | 100 +++++
.../processors/cache/CacheMetricsSnapshot.java | 380 +++++++++++++----
.../processors/cache/GridCacheAdapter.java | 12 +-
.../processors/cache/GridCacheSwapManager.java | 118 ++++--
.../internal/processors/hadoop/HadoopJob.java | 2 +-
.../ignite/internal/processors/igfs/IgfsEx.java | 8 +-
.../internal/processors/igfs/IgfsImpl.java | 8 +-
.../processors/igfs/IgfsIpcHandler.java | 184 +++++----
.../igfs/IgfsSecondaryFileSystemImpl.java | 9 +-
.../internal/processors/igfs/IgfsServer.java | 4 +-
.../internal/processors/igfs/IgfsUtils.java | 16 +
.../ignite/internal/util/GridJavaProcess.java | 30 +-
.../ignite/mxbean/CacheMetricsMXBean.java | 80 ++++
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 35 +-
.../org/apache/ignite/spi/IgniteSpiContext.java | 47 ---
.../spi/swapspace/file/FileSwapSpaceSpi.java | 8 +-
...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 412 +++++++++++++++++++
.../testframework/GridSpiTestContext.java | 25 +-
.../IgniteCacheMetricsSelfTestSuite.java | 1 +
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 165 +++++---
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 107 +++--
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 32 +-
.../internal/processors/hadoop/HadoopUtils.java | 10 +-
.../hadoop/SecondaryFileSystemProvider.java | 53 ++-
.../hadoop/fs/HadoopDistributedFileSystem.java | 91 ----
.../hadoop/fs/HadoopFileSystemsUtils.java | 17 -
.../hadoop/fs/HadoopLazyConcurrentMap.java | 204 +++++++++
.../processors/hadoop/igfs/HadoopIgfsEx.java | 6 +
.../hadoop/igfs/HadoopIgfsInProc.java | 170 ++++++--
.../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 2 +-
.../hadoop/igfs/HadoopIgfsOutProc.java | 33 +-
.../hadoop/igfs/HadoopIgfsWrapper.java | 19 +-
.../hadoop/v2/HadoopV2TaskContext.java | 4 +-
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 56 ++-
...oopSecondaryFileSystemConfigurationTest.java | 4 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 63 ++-
.../IgniteHadoopFileSystemClientSelfTest.java | 2 +-
.../IgniteHadoopFileSystemIpcCacheSelfTest.java | 2 +
.../hadoop/HadoopFileSystemsTest.java | 23 +-
.../collections/HadoopSkipListSelfTest.java | 4 +-
47 files changed, 2537 insertions(+), 805 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d10120d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d10120d6/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
[06/10] incubator-ignite git commit: ignite-866 NPE during clean up
Posted by se...@apache.org.
ignite-866 NPE during clean up
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2d9a938a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2d9a938a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2d9a938a
Branch: refs/heads/ignite-943
Commit: 2d9a938a281887fdae804ac68a89e93e5ad1b02b
Parents: 8455c7a
Author: agura <ag...@gridgain.com>
Authored: Thu May 14 14:40:38 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 29 16:43:38 2015 +0300
----------------------------------------------------------------------
.../processors/cache/CacheMetricsImpl.java | 62 +++++++++++++++-----
.../processors/cache/GridCacheAdapter.java | 12 +++-
2 files changed, 55 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d9a938a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 560de97..af19077 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.cache.store.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -118,7 +119,9 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public long getOverflowSize() {
try {
- return cctx.cache().overflowSize();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.overflowSize() : -1;
}
catch (IgniteCheckedException ignored) {
return -1;
@@ -127,34 +130,47 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
- return cctx.cache().offHeapEntriesCount();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.offHeapEntriesCount() : -1;
}
/** {@inheritDoc} */
@Override public long getOffHeapAllocatedSize() {
- return cctx.cache().offHeapAllocatedSize();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.offHeapAllocatedSize() : -1;
}
/** {@inheritDoc} */
@Override public int getSize() {
- return cctx.cache().size();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.size() : 0;
}
/** {@inheritDoc} */
@Override public int getKeySize() {
- return cctx.cache().size();
+ return getSize();
}
/** {@inheritDoc} */
@Override public boolean isEmpty() {
- return cctx.cache().isEmpty();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache == null || cache.isEmpty();
}
/** {@inheritDoc} */
@Override public int getDhtEvictQueueCurrentSize() {
- return cctx.isNear() ?
- dhtCtx != null ? dhtCtx.evicts().evictQueueSize() : -1
- : cctx.evicts().evictQueueSize();
+ GridCacheContext<?, ?> ctx = cctx.isNear() ? dhtCtx : cctx;
+
+ if (ctx == null)
+ return -1;
+
+ GridCacheEvictionManager evictMgr = ctx.evicts();
+
+ return evictMgr != null ? evictMgr.evictQueueSize() : -1;
}
/** {@inheritDoc} */
@@ -548,37 +564,51 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public String getKeyType() {
- return cctx.config().getKeyType().getName();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null ? ccfg.getKeyType().getName() : null;
}
/** {@inheritDoc} */
@Override public String getValueType() {
- return cctx.config().getValueType().getName();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null ? ccfg.getValueType().getName() : null;
}
/** {@inheritDoc} */
@Override public boolean isReadThrough() {
- return cctx.config().isReadThrough();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isReadThrough();
}
/** {@inheritDoc} */
@Override public boolean isWriteThrough() {
- return cctx.config().isWriteThrough();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isWriteThrough();
}
/** {@inheritDoc} */
@Override public boolean isStoreByValue() {
- return cctx.config().isStoreByValue();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isStoreByValue();
}
/** {@inheritDoc} */
@Override public boolean isStatisticsEnabled() {
- return cctx.config().isStatisticsEnabled();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isStatisticsEnabled();
}
/** {@inheritDoc} */
@Override public boolean isManagementEnabled() {
- return cctx.config().isManagementEnabled();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isManagementEnabled();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d9a938a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d390037..c975961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3249,7 +3249,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public long overflowSize() throws IgniteCheckedException {
- return ctx.swap().swapSize();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.swapSize() : -1;
}
/**
@@ -3802,12 +3804,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
- return ctx.swap().offHeapEntriesCount();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.offHeapEntriesCount() : -1;
}
/** {@inheritDoc} */
@Override public long offHeapAllocatedSize() {
- return ctx.swap().offHeapAllocatedSize();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.offHeapAllocatedSize() : -1;
}
/** {@inheritDoc} */
[08/10] incubator-ignite git commit: Merge branch 'ignite-sprint-5'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into
ignite-sprint-5
Posted by se...@apache.org.
Merge branch 'ignite-sprint-5' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6cd1a6e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6cd1a6e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6cd1a6e5
Branch: refs/heads/ignite-943
Commit: 6cd1a6e55880571e09785afc824e258070242284
Parents: 2d9a938 5df0668
Author: agura <ag...@gridgain.com>
Authored: Fri May 29 16:59:49 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 29 16:59:49 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/util/GridJavaProcess.java | 30 ++++++++++++--------
1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[04/10] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong
staging permissions while running MR job under hadoop accelerator): IGFS
part.
Posted by se...@apache.org.
[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35388195
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35388195
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35388195
Branch: refs/heads/ignite-943
Commit: 353881951fcdcc16c3dc31d808d3af6c263f74ce
Parents: 7ec4c82
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 15:31:35 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 15:31:35 2015 +0300
----------------------------------------------------------------------
.../igfs/secondary/IgfsSecondaryFileSystem.java | 7 +
.../internal/igfs/common/IgfsMarshaller.java | 35 +---
.../igfs/common/IgfsPathControlRequest.java | 22 +++
.../internal/processors/hadoop/HadoopJob.java | 2 +-
.../ignite/internal/processors/igfs/IgfsEx.java | 8 +-
.../internal/processors/igfs/IgfsImpl.java | 8 +-
.../processors/igfs/IgfsIpcHandler.java | 184 ++++++++++---------
.../igfs/IgfsSecondaryFileSystemImpl.java | 9 +-
.../internal/processors/igfs/IgfsServer.java | 4 +-
.../internal/processors/igfs/IgfsUtils.java | 16 ++
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 165 ++++++++++++-----
.../hadoop/fs/v1/IgniteHadoopFileSystem.java | 107 +++++++----
.../hadoop/fs/v2/IgniteHadoopFileSystem.java | 32 +++-
.../internal/processors/hadoop/HadoopUtils.java | 10 +-
.../hadoop/SecondaryFileSystemProvider.java | 53 +++---
.../hadoop/fs/HadoopDistributedFileSystem.java | 91 ---------
.../hadoop/fs/HadoopFileSystemsUtils.java | 17 --
.../processors/hadoop/igfs/HadoopIgfsEx.java | 6 +
.../hadoop/igfs/HadoopIgfsInProc.java | 170 ++++++++++++-----
.../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 2 +-
.../hadoop/igfs/HadoopIgfsOutProc.java | 33 +++-
.../hadoop/igfs/HadoopIgfsWrapper.java | 19 +-
.../hadoop/v2/HadoopV2TaskContext.java | 4 +-
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 56 ++++--
...oopSecondaryFileSystemConfigurationTest.java | 4 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 63 +++++--
.../IgniteHadoopFileSystemClientSelfTest.java | 2 +-
.../IgniteHadoopFileSystemIpcCacheSelfTest.java | 2 +
.../hadoop/HadoopFileSystemsTest.java | 23 +--
.../collections/HadoopSkipListSelfTest.java | 4 +-
30 files changed, 684 insertions(+), 474 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 9026eac..cb69352 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem {
* @return Map of properties.
*/
public Map<String,String> properties();
+
+
+ /**
+ * Closes the secondary file system.
+ * @throws IgniteException in case of an error.
+ */
+ public void close() throws IgniteException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
index 11af716..6a6f22a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java
@@ -73,6 +73,7 @@ public class IgfsMarshaller {
}
/**
+ * Serializes the message and sends it into the given output stream.
* @param msg Message.
* @param hdr Message header.
* @param out Output.
@@ -119,6 +120,7 @@ public class IgfsMarshaller {
IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+ U.writeString(out, req.userName());
writePath(out, req.path());
writePath(out, req.destinationPath());
out.writeBoolean(req.flag());
@@ -236,6 +238,7 @@ public class IgfsMarshaller {
case OPEN_CREATE: {
IgfsPathControlRequest req = new IgfsPathControlRequest();
+ req.userName(U.readString(in));
req.path(readPath(in));
req.destinationPath(readPath(in));
req.flag(in.readBoolean());
@@ -298,8 +301,6 @@ public class IgfsMarshaller {
}
}
- assert msg != null;
-
msg.command(cmd);
return msg;
@@ -341,34 +342,4 @@ public class IgfsMarshaller {
return null;
}
-
- /**
- * Writes string to output.
- *
- * @param out Data output.
- * @param str String.
- * @throws IOException If write failed.
- */
- private void writeString(DataOutput out, @Nullable String str) throws IOException {
- out.writeBoolean(str != null);
-
- if (str != null)
- out.writeUTF(str);
- }
-
- /**
- * Reads string from input.
- *
- * @param in Data input.
- * @return Read string.
- * @throws IOException If read failed.
- */
- @Nullable private String readString(DataInput in) throws IOException {
- boolean hasStr = in.readBoolean();
-
- if (hasStr)
- return in.readUTF();
-
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
index 7ed1619..2f6e6e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.igfs.common;
import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
@@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage {
/** Last modification time. */
private long modificationTime;
+ /** The user name this control request is made on behalf of. */
+ private String userName;
+
/**
* @param path Path.
*/
@@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage {
@Override public String toString() {
return S.toString(IgfsPathControlRequest.class, this, "cmd", command());
}
+
+ /**
+ * Getter for the user name.
+ * @return user name.
+ */
+ public final String userName() {
+ assert userName != null;
+
+ return userName;
+ }
+
+ /**
+ * Setter for the user name.
+ * @param userName the user name.
+ */
+ public final void userName(String userName) {
+ this.userName = IgfsUtils.fixUserName(userName);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
index 65cb48d..5fd6c81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
@@ -98,5 +98,5 @@ public interface HadoopJob {
/**
* Cleans up the job staging directory.
*/
- void cleanupStagingDirectory();
+ public void cleanupStagingDirectory();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 7c1a837..361f75f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem {
/** Property name for URI of file system. */
public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
- /** Property name for user name of file system. */
- public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
+ /** Property name for default user name of file system.
+ * NOTE: for secondary file system this is just a default user name, which is used
+ * when the 2ndary filesystem is used outside of any user context.
+ * If another user name is set in the context, 2ndary file system will work on behalf
+ * of that user, which is different from the default. */
+ public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
/**
* Stops IGFS cleaning all used resources.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 34636d2..c3495e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -245,8 +245,12 @@ public final class IgfsImpl implements IgfsEx {
for (IgfsFileWorkerBatch batch : workerMap.values())
batch.cancel();
- if (secondaryFs instanceof AutoCloseable)
- U.closeQuiet((AutoCloseable)secondaryFs);
+ try {
+ secondaryFs.close();
+ }
+ catch (Exception e) {
+ log.error("Failed to close secondary file system.", e);
+ }
}
igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 8a8b858..cfe6ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
private final int bufSize; // Buffer size. Must not be less then file block size.
/** IGFS instance for this handler. */
- private IgfsEx igfs;
+ private final IgfsEx igfs;
/** Resource ID generator. */
- private AtomicLong rsrcIdGen = new AtomicLong();
+ private final AtomicLong rsrcIdGen = new AtomicLong();
/** Stopping flag. */
private volatile boolean stopping;
@@ -241,138 +241,148 @@ class IgfsIpcHandler implements IgfsServerHandler {
* @return Response message.
* @throws IgniteCheckedException If failed.
*/
- private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd,
+ private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd,
IgfsMessage msg) throws IgniteCheckedException {
- IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+ final IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
if (log.isDebugEnabled())
log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']');
- IgfsControlResponse res = new IgfsControlResponse();
+ final IgfsControlResponse res = new IgfsControlResponse();
+
+ final String userName = req.userName();
+
+ assert userName != null;
try {
- switch (cmd) {
- case EXISTS:
- res.response(igfs.exists(req.path()));
+ IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() {
+ @Override public Void apply() {
+ switch (cmd) {
+ case EXISTS:
+ res.response(igfs.exists(req.path()));
- break;
+ break;
- case INFO:
- res.response(igfs.info(req.path()));
+ case INFO:
+ res.response(igfs.info(req.path()));
- break;
+ break;
- case PATH_SUMMARY:
- res.response(igfs.summary(req.path()));
+ case PATH_SUMMARY:
+ res.response(igfs.summary(req.path()));
- break;
+ break;
- case UPDATE:
- res.response(igfs.update(req.path(), req.properties()));
+ case UPDATE:
+ res.response(igfs.update(req.path(), req.properties()));
- break;
+ break;
- case RENAME:
- igfs.rename(req.path(), req.destinationPath());
+ case RENAME:
+ igfs.rename(req.path(), req.destinationPath());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case DELETE:
- res.response(igfs.delete(req.path(), req.flag()));
+ case DELETE:
+ res.response(igfs.delete(req.path(), req.flag()));
- break;
+ break;
- case MAKE_DIRECTORIES:
- igfs.mkdirs(req.path(), req.properties());
+ case MAKE_DIRECTORIES:
+ igfs.mkdirs(req.path(), req.properties());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case LIST_PATHS:
- res.paths(igfs.listPaths(req.path()));
+ case LIST_PATHS:
+ res.paths(igfs.listPaths(req.path()));
- break;
+ break;
- case LIST_FILES:
- res.files(igfs.listFiles(req.path()));
+ case LIST_FILES:
+ res.files(igfs.listFiles(req.path()));
- break;
+ break;
- case SET_TIMES:
- igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+ case SET_TIMES:
+ igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case AFFINITY:
- res.locations(igfs.affinity(req.path(), req.start(), req.length()));
+ case AFFINITY:
+ res.locations(igfs.affinity(req.path(), req.start(), req.length()));
- break;
+ break;
- case OPEN_READ: {
- IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
- igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
+ case OPEN_READ: {
+ IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
+ igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
- long streamId = registerResource(ses, igfsIn);
+ long streamId = registerResource(ses, igfsIn);
- if (log.isDebugEnabled())
- log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
- igfsIn.fileInfo().modificationTime());
+ IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
+ igfsIn.fileInfo().modificationTime());
- res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
+ res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
- break;
- }
+ break;
+ }
- case OPEN_CREATE: {
- long streamId = registerResource(ses, igfs.create(
- req.path(), // Path.
- bufSize, // Buffer size.
- req.flag(), // Overwrite if exists.
- affinityKey(req), // Affinity key based on replication factor.
- req.replication(),// Replication factor.
- req.blockSize(), // Block size.
- req.properties() // File properties.
- ));
+ case OPEN_CREATE: {
+ long streamId = registerResource(ses, igfs.create(
+ req.path(), // Path.
+ bufSize, // Buffer size.
+ req.flag(), // Overwrite if exists.
+ affinityKey(req), // Affinity key based on replication factor.
+ req.replication(),// Replication factor.
+ req.blockSize(), // Block size.
+ req.properties() // File properties.
+ ));
- if (log.isDebugEnabled())
- log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(streamId);
+ res.response(streamId);
- break;
- }
+ break;
+ }
- case OPEN_APPEND: {
- long streamId = registerResource(ses, igfs.append(
- req.path(), // Path.
- bufSize, // Buffer size.
- req.flag(), // Create if absent.
- req.properties() // File properties.
- ));
+ case OPEN_APPEND: {
+ long streamId = registerResource(ses, igfs.append(
+ req.path(), // Path.
+ bufSize, // Buffer size.
+ req.flag(), // Create if absent.
+ req.properties() // File properties.
+ ));
- if (log.isDebugEnabled())
- log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(streamId);
+ res.response(streamId);
- break;
- }
+ break;
+ }
- default:
- assert false : "Unhandled path control request command: " + cmd;
+ default:
+ assert false : "Unhandled path control request command: " + cmd;
- break;
- }
+ break;
+ }
+
+ return null;
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 683b317..b8095b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -30,14 +30,14 @@ import java.util.*;
*/
class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
/** Delegate. */
- private final IgfsImpl igfs;
+ private final IgfsEx igfs;
/**
* Constructor.
*
* @param igfs Delegate.
*/
- IgfsSecondaryFileSystemImpl(IgfsImpl igfs) {
+ IgfsSecondaryFileSystemImpl(IgfsEx igfs) {
this.igfs = igfs;
}
@@ -118,4 +118,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
@Override public Map<String, String> properties() {
return Collections.emptyMap();
}
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ igfs.stop(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 253d5be..caa6866 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -239,13 +239,13 @@ public class IgfsServer {
*/
private class ClientWorker extends GridWorker {
/** Connected client endpoint. */
- private IpcEndpoint endpoint;
+ private final IpcEndpoint endpoint;
/** Data output stream. */
private final IgfsDataOutputStream out;
/** Client session object. */
- private IgfsClientSession ses;
+ private final IgfsClientSession ses;
/** Queue node for fast unlink. */
private ConcurrentLinkedDeque8.Node<ClientWorker> node;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 4b0234f..8026a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
import java.lang.reflect.*;
@@ -88,4 +90,18 @@ public class IgfsUtils {
private IgfsUtils() {
// No-op.
}
+
+ /**
+ * Provides non-null user name.
+ * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
+ * which is the current process owner user.
+ * @param user a user name to be fixed.
+ * @return non-null interned user name.
+ */
+ public static String fixUserName(@Nullable String user) {
+ if (F.isEmpty(user))
+ user = FileSystemConfiguration.DFLT_USER_NAME;
+
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index ba891f8..6a630fb 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -20,15 +20,16 @@ package org.apache.ignite.hadoop.fs;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.igfs.secondary.*;
import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
import org.apache.ignite.internal.processors.hadoop.igfs.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.typedef.*;
import org.jetbrains.annotations.*;
+import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*;
import java.io.*;
import java.net.*;
@@ -37,15 +38,45 @@ import java.util.*;
import static org.apache.ignite.internal.processors.igfs.IgfsEx.*;
/**
- * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}.
+ * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}.
+ * In fact, this class deals with different FileSystems depending on the user context,
+ * see {@link IgfsUserContext#currentUser()}.
*/
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable {
- /** Hadoop file system. */
- private final FileSystem fileSys;
-
- /** Properties of file system */
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem {
+ /** Properties of file system, see {@link #properties()}
+ *
+ * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH}
+ * See {@link IgfsEx#SECONDARY_FS_URI}
+ * See {@link IgfsEx#SECONDARY_FS_USER_NAME}
+ * */
private final Map<String, String> props = new HashMap<>();
+ /** Secondary file system provider. */
+ private final SecondaryFileSystemProvider secProvider;
+
+ /** The default user name. It is used if no user context is set. */
+ private final String dfltUserName;
+
+ /** FileSystem instance created for the default user.
+ * Stored outside the fileSysLazyMap due to performance reasons. */
+ private final FileSystem dfltFs;
+
+ /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */
+ private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>(
+ new ValueFactory<String, FileSystem>() {
+ @Override public FileSystem createValue(String key) {
+ try {
+ assert !F.isEmpty(key);
+
+ return secProvider.createFileSystem(key);
+ }
+ catch (IOException ioe) {
+ throw new IgniteException(ioe);
+ }
+ }
+ }
+ );
+
/**
* Simple constructor that is to be used by default.
*
@@ -77,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @throws IgniteCheckedException In case of error.
*/
public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath,
- @Nullable String userName)
- throws IgniteCheckedException {
+ @Nullable String userName) throws IgniteCheckedException {
// Treat empty uri and userName arguments as nulls to improve configuration usability:
if (F.isEmpty(uri))
uri = null;
@@ -89,27 +119,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
if (F.isEmpty(userName))
userName = null;
+ this.dfltUserName = IgfsUtils.fixUserName(userName);
+
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath, userName);
+ this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath);
- fileSys = secProvider.createFileSystem();
+ // File system creation for the default user name.
+ // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field:
+ this.dfltFs = secProvider.createFileSystem(dfltUserName);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
- uri = secProvider.uri().toString();
+ assert dfltFs != null;
- if (!uri.endsWith("/"))
- uri += "/";
+ uri = secProvider.uri().toString();
- if (cfgPath != null)
- props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
+ if (!uri.endsWith("/"))
+ uri += "/";
- if (userName != null)
- props.put(SECONDARY_FS_USER_NAME, userName);
+ if (cfgPath != null)
+ props.put(SECONDARY_FS_CONFIG_PATH, cfgPath);
- props.put(SECONDARY_FS_URI, uri);
- }
- catch (IOException e) {
- throw new IgniteCheckedException(e);
- }
+ props.put(SECONDARY_FS_URI, uri);
+ props.put(SECONDARY_FS_USER_NAME, dfltUserName);
}
/**
@@ -119,7 +153,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @return Hadoop path.
*/
private Path convert(IgfsPath path) {
- URI uri = fileSys.getUri();
+ URI uri = fileSysForUser().getUri();
return new Path(uri.getScheme(), uri.getAuthority(), path.toString());
}
@@ -131,14 +165,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
* @param detailMsg Detailed error message.
* @return Appropriate exception.
*/
- @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
private IgfsException handleSecondaryFsError(IOException e, String detailMsg) {
- boolean wrongVer = X.hasCause(e, RemoteException.class) ||
- (e.getMessage() != null && e.getMessage().contains("Failed on local"));
-
- return !wrongVer ? cast(detailMsg, e) :
- new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " +
- "version.", e); }
+ return cast(detailMsg, e);
+ }
/**
* Cast IO exception to IGFS exception.
@@ -178,7 +207,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public boolean exists(IgfsPath path) {
try {
- return fileSys.exists(convert(path));
+ return fileSysForUser().exists(convert(path));
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]");
@@ -189,6 +218,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) {
HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
+ final FileSystem fileSys = fileSysForUser();
+
try {
if (props0.userName() != null || props0.groupName() != null)
fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
@@ -208,7 +239,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public void rename(IgfsPath src, IgfsPath dest) {
// Delegate to the secondary file system.
try {
- if (!fileSys.rename(convert(src), convert(dest)))
+ if (!fileSysForUser().rename(convert(src), convert(dest)))
throw new IgfsException("Failed to rename (secondary file system returned false) " +
"[src=" + src + ", dest=" + dest + ']');
}
@@ -220,7 +251,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public boolean delete(IgfsPath path, boolean recursive) {
try {
- return fileSys.delete(convert(path), recursive);
+ return fileSysForUser().delete(convert(path), recursive);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]");
@@ -230,7 +261,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path) {
try {
- if (!fileSys.mkdirs(convert(path)))
+ if (!fileSysForUser().mkdirs(convert(path)))
throw new IgniteException("Failed to make directories [path=" + path + "]");
}
catch (IOException e) {
@@ -241,7 +272,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) {
try {
- if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
+ if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission()))
throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]");
}
catch (IOException e) {
@@ -252,7 +283,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public Collection<IgfsPath> listPaths(IgfsPath path) {
try {
- FileStatus[] statuses = fileSys.listStatus(convert(path));
+ FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -275,7 +306,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public Collection<IgfsFile> listFiles(IgfsPath path) {
try {
- FileStatus[] statuses = fileSys.listStatus(convert(path));
+ FileStatus[] statuses = fileSysForUser().listStatus(convert(path));
if (statuses == null)
throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path);
@@ -302,13 +333,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) {
- return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize);
+ return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize);
}
/** {@inheritDoc} */
@Override public OutputStream create(IgfsPath path, boolean overwrite) {
try {
- return fileSys.create(convert(path), overwrite);
+ return fileSysForUser().create(convert(path), overwrite);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]");
@@ -322,8 +353,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap());
try {
- return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize,
- null);
+ return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize,
+ (short)replication, blockSize, null);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props +
@@ -336,7 +367,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
@Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
@Nullable Map<String, String> props) {
try {
- return fileSys.append(convert(path), bufSize);
+ return fileSysForUser().append(convert(path), bufSize);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -346,7 +377,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
/** {@inheritDoc} */
@Override public IgfsFile info(final IgfsPath path) {
try {
- final FileStatus status = fileSys.getFileStatus(convert(path));
+ final FileStatus status = fileSysForUser().getFileStatus(convert(path));
if (status == null)
return null;
@@ -421,7 +452,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
try {
// We don't use FileSystem#getUsed() since it counts only the files
// in the filesystem root, not all the files recursively.
- return fileSys.getContentSummary(new Path("/")).getSpaceConsumed();
+ return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed();
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to get used space size of file system.");
@@ -429,25 +460,57 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
}
/** {@inheritDoc} */
- @Nullable @Override public Map<String, String> properties() {
+ @Override public Map<String, String> properties() {
return props;
}
/** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
+ @Override public void close() throws IgniteException {
+ Exception e = null;
+
try {
- fileSys.close();
+ dfltFs.close();
}
- catch (IOException e) {
- throw new IgniteCheckedException(e);
+ catch (Exception e0) {
+ e = e0;
+ }
+
+ try {
+ fileSysLazyMap.close();
+ }
+ catch (IgniteCheckedException ice) {
+ if (e == null)
+ e = ice;
}
+
+ if (e != null)
+ throw new IgniteException(e);
}
/**
* Gets the underlying {@link FileSystem}.
+ * This method is used solely for testing.
* @return the underlying Hadoop {@link FileSystem}.
*/
public FileSystem fileSystem() {
- return fileSys;
+ return fileSysForUser();
+ }
+
+ /**
+ * Gets the FileSystem for the current context user.
+ * @return the FileSystem instance, never null.
+ */
+ private FileSystem fileSysForUser() {
+ String user = IgfsUserContext.currentUser();
+
+ if (F.isEmpty(user))
+ user = dfltUserName; // default is never empty.
+
+ assert !F.isEmpty(user);
+
+ if (F.eq(user, dfltUserName))
+ return dfltFs; // optimization
+
+ return fileSysLazyMap.getOrCreate(user);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index 1f53a06..c0a9ade 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
@@ -97,21 +98,8 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** Grid remote client. */
private HadoopIgfsWrapper rmtClient;
- /** User name for each thread. */
- private final ThreadLocal<String> userName = new ThreadLocal<String>(){
- /** {@inheritDoc} */
- @Override protected String initialValue() {
- return DFLT_USER_NAME;
- }
- };
-
- /** Working directory for each thread. */
- private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){
- /** {@inheritDoc} */
- @Override protected Path initialValue() {
- return getHomeDirectory();
- }
- };
+ /** working directory. */
+ private Path workingDir;
/** Default replication factor. */
private short dfltReplication;
@@ -129,6 +117,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** Secondary URI string. */
private URI secondaryUri;
+ /** The user name this file system was created on behalf of. */
+ private String user;
+
/** IGFS mode resolver. */
private IgfsModeResolver modeRslvr;
@@ -182,6 +173,36 @@ public class IgniteHadoopFileSystem extends FileSystem {
}
/**
+ * Gets non-null and interned user name as per the Hadoop file system viewpoint.
+ * @return the user name, never null.
+ */
+ public static String getFsHadoopUser(Configuration cfg) throws IOException {
+ String user = null;
+
+ // -------------------------------------------
+ // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
+ // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
+ // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
+ // ugi.doAs() closure.
+ if (cfg != null)
+ user = cfg.get(MRJobConfig.USER_NAME);
+ // -------------------------------------------
+
+ if (user == null) {
+ UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();
+
+ if (currUgi != null)
+ user = currUgi.getShortUserName();
+ }
+
+ user = IgfsUtils.fixUserName(user);
+
+ assert user != null;
+
+ return user;
+ }
+
+ /**
* Public setter that can be used by direct users of FS or Visor.
*
* @param colocateFileWrites Whether all ongoing file writes should be colocated.
@@ -221,7 +242,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
uriAuthority = uri.getAuthority();
- setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+ user = getFsHadoopUser(cfg);
// Override sequential reads before prefetch if needed.
seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0);
@@ -244,7 +265,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
- rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+ rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
// Handshake.
IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -289,13 +310,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
String secUri = props.get(SECONDARY_FS_URI);
String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
- String secUserName = props.get(SECONDARY_FS_USER_NAME);
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
- secUserName);
+ SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+ secondaryFs = secProvider.createFileSystem(user);
- secondaryFs = secProvider.createFileSystem();
secondaryUri = secProvider.uri();
}
catch (IOException e) {
@@ -306,6 +326,9 @@ public class IgniteHadoopFileSystem extends FileSystem {
"will have no effect): " + e.getMessage());
}
}
+
+ // set working directory to the home directory of the current Fs user:
+ setWorkingDirectory(null);
}
finally {
leaveBusy();
@@ -849,22 +872,11 @@ public class IgniteHadoopFileSystem extends FileSystem {
/** {@inheritDoc} */
@Override public Path getHomeDirectory() {
- Path path = new Path("/user/" + userName.get());
+ Path path = new Path("/user/" + user);
return path.makeQualified(getUri(), null);
}
- /**
- * Set user name and default working directory for current thread.
- *
- * @param userName User name.
- */
- public void setUser(String userName) {
- this.userName.set(userName);
-
- setWorkingDirectory(null);
- }
-
/** {@inheritDoc} */
@Override public void setWorkingDirectory(Path newPath) {
if (newPath == null) {
@@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (secondaryFs != null)
secondaryFs.setWorkingDirectory(toSecondary(homeDir));
- workingDir.set(homeDir);
+ workingDir = homeDir;
}
else {
Path fixedNewPath = fixRelativePart(newPath);
@@ -886,13 +898,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
if (secondaryFs != null)
secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath));
- workingDir.set(fixedNewPath);
+ workingDir = fixedNewPath;
}
}
/** {@inheritDoc} */
@Override public Path getWorkingDirectory() {
- return workingDir.get();
+ return workingDir;
}
/** {@inheritDoc} */
@@ -1153,7 +1165,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
return null;
return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) :
- new IgfsPath(convert(workingDir.get()), path.toUri().getPath());
+ new IgfsPath(convert(workingDir), path.toUri().getPath());
}
/**
@@ -1191,9 +1203,16 @@ public class IgniteHadoopFileSystem extends FileSystem {
*/
@SuppressWarnings("deprecation")
private FileStatus convert(IgfsFile file) {
- return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(),
- file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file),
- file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"),
+ return new FileStatus(
+ file.length(),
+ file.isDirectory(),
+ getDefaultReplication(),
+ file.groupBlockSize(),
+ file.modificationTime(),
+ file.accessTime(),
+ permission(file),
+ file.property(PROP_USER_NAME, user),
+ file.property(PROP_GROUP_NAME, "users"),
convert(file.path())) {
@Override public String toString() {
return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() +
@@ -1247,4 +1266,12 @@ public class IgniteHadoopFileSystem extends FileSystem {
@Override public String toString() {
return S.toString(IgniteHadoopFileSystem.class, this);
}
+
+ /**
+ * Returns the user name this File System is created on behalf of.
+ * @return the user name
+ */
+ public String user() {
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 9cfb79b..f3fbe9c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.ignite.*;
import org.apache.ignite.igfs.*;
@@ -40,6 +39,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*;
import static org.apache.ignite.igfs.IgfsMode.*;
import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*;
@@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
/** Grid remote client. */
private HadoopIgfsWrapper rmtClient;
+ /** The name of the user this File System created on behalf of. */
+ private final String user;
+
/** Working directory. */
private IgfsPath workingDir;
/** URI. */
- private URI uri;
+ private final URI uri;
/** Authority. */
private String uriAuthority;
@@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
uri = name;
+ user = getFsHadoopUser(cfg);
+
try {
initialize(name, cfg);
}
@@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
throw e;
}
- workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+ workingDir = new IgfsPath("/user/" + user);
}
/** {@inheritDoc} */
@@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null;
- rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
+ rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user);
// Handshake.
IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -284,13 +289,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
String secUri = props.get(SECONDARY_FS_URI);
String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH);
- String secUserName = props.get(SECONDARY_FS_USER_NAME);
try {
- SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath,
- secUserName);
+ SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath);
+
+ secondaryFs = secProvider.createAbstractFileSystem(user);
- secondaryFs = secProvider.createAbstractFileSystem();
secondaryUri = secProvider.uri();
}
catch (IOException e) {
@@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
file.modificationTime(),
file.accessTime(),
permission(file),
- file.property(PROP_USER_NAME, DFLT_USER_NAME),
+ file.property(PROP_USER_NAME, user),
file.property(PROP_GROUP_NAME, "users"),
convert(file.path())) {
@Override public String toString() {
@@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea
@Override public String toString() {
return S.toString(IgniteHadoopFileSystem.class, this);
}
-}
+
+ /**
+ * Returns the user name this File System is created on behalf of.
+ * @return the user name
+ */
+ public String user() {
+ return user;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 00be422..d493bd4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -126,11 +126,15 @@ public class HadoopUtils {
break;
case PHASE_REDUCE:
- assert status.totalReducerCnt() > 0;
-
+ // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers?
+ // See https://issues.apache.org/jira/browse/IGNITE-764
setupProgress = 1;
mapProgress = 1;
- reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+
+ if (status.totalReducerCnt() > 0)
+ reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+ else
+ reduceProgress = 1f;
break;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index 27805f8..b1a057c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.hadoop;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.security.*;
+import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jetbrains.annotations.*;
import java.io.*;
import java.net.*;
+import java.security.*;
/**
* Encapsulates logic of secondary filesystem creation.
@@ -36,9 +39,6 @@ public class SecondaryFileSystemProvider {
/** The secondary filesystem URI, never null. */
private final URI uri;
- /** Optional user name to log into secondary filesystem with. */
- private @Nullable final String userName;
-
/**
* Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be
* specified either explicitly or in the configuration provided.
@@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider {
* property in the provided configuration.
* @param secConfPath the secondary Fs path (file path on the local file system, optional).
* See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved.
- * @param userName User name.
* @throws IOException
*/
public SecondaryFileSystemProvider(final @Nullable String secUri,
- final @Nullable String secConfPath, @Nullable String userName) throws IOException {
- this.userName = userName;
-
+ final @Nullable String secConfPath) throws IOException {
if (secConfPath != null) {
URL url = U.resolveIgniteUrl(secConfPath);
@@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider {
* @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs.
* @throws IOException
*/
- public FileSystem createFileSystem() throws IOException {
+ public FileSystem createFileSystem(String userName) throws IOException {
+ userName = IgfsUtils.fixUserName(userName);
+
final FileSystem fileSys;
- if (userName == null)
- fileSys = FileSystem.get(uri, cfg);
- else {
- try {
- fileSys = FileSystem.get(uri, cfg, userName);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ try {
+ fileSys = FileSystem.get(uri, cfg, userName);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
- throw new IOException("Failed to create file system due to interrupt.", e);
- }
+ throw new IOException("Failed to create file system due to interrupt.", e);
}
return fileSys;
@@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider {
/**
* @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs.
- * @throws IOException
+ * @throws IOException in case of error.
*/
- public AbstractFileSystem createAbstractFileSystem() throws IOException {
- return AbstractFileSystem.get(uri, cfg);
+ public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException {
+ userName = IgfsUtils.fixUserName(userName);
+
+ String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName);
+
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() {
+ @Override public AbstractFileSystem run() throws IOException {
+ return AbstractFileSystem.get(uri, cfg);
+ }
+ });
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+
+ throw new IOException("Failed to create file system due to interrupt.", ie);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
deleted file mode 100644
index 509f443..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.ignite.configuration.FileSystemConfiguration.*;
-
-/**
- * Wrapper of HDFS for support of separated working directory.
- */
-public class HadoopDistributedFileSystem extends DistributedFileSystem {
- /** User name for each thread. */
- private final ThreadLocal<String> userName = new ThreadLocal<String>() {
- /** {@inheritDoc} */
- @Override protected String initialValue() {
- return DFLT_USER_NAME;
- }
- };
-
- /** Working directory for each thread. */
- private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
- /** {@inheritDoc} */
- @Override protected Path initialValue() {
- return getHomeDirectory();
- }
- };
-
- /** {@inheritDoc} */
- @Override public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
-
- setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
- }
-
- /**
- * Set user name and default working directory for current thread.
- *
- * @param userName User name.
- */
- public void setUser(String userName) {
- this.userName.set(userName);
-
- setWorkingDirectory(getHomeDirectory());
- }
-
- /** {@inheritDoc} */
- @Override public Path getHomeDirectory() {
- Path path = new Path("/user/" + userName.get());
-
- return path.makeQualified(getUri(), null);
- }
-
- /** {@inheritDoc} */
- @Override public void setWorkingDirectory(Path dir) {
- Path fixedDir = fixRelativePart(dir);
-
- String res = fixedDir.toUri().getPath();
-
- if (!DFSUtil.isValidName(res))
- throw new IllegalArgumentException("Invalid DFS directory name " + res);
-
- workingDir.set(fixedDir);
- }
-
- /** {@inheritDoc} */
- @Override public Path getWorkingDirectory() {
- return workingDir.get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
index f3f51d4..d90bc28 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
/**
* Utilities for configuring file systems to support the separate working directory per each thread.
@@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils {
public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
/**
- * Set user name and default working directory for current thread if it's supported by file system.
- *
- * @param fs File system.
- * @param userName User name.
- */
- public static void setUser(FileSystem fs, String userName) {
- if (fs instanceof IgniteHadoopFileSystem)
- ((IgniteHadoopFileSystem)fs).setUser(userName);
- else if (fs instanceof HadoopDistributedFileSystem)
- ((HadoopDistributedFileSystem)fs).setUser(userName);
- }
-
- /**
* Setup wrappers of filesystems to support the separate working directory.
*
* @param cfg Config for setup.
@@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils {
cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
HadoopLocalFileSystemV2.class.getName());
-
- cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
index 2f19226..b9c5113 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java
@@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs {
* @throws IOException If failed.
*/
public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+
+ /**
+ * The user this Igfs instance works on behalf of.
+ * @return the user name.
+ */
+ public String user();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index 44e531e..47ba0e8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.io.*;
@@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
/** Logger. */
private final Log log;
+ /** The user this Igfs works on behalf of. */
+ private final String user;
+
/**
* Constructor.
*
* @param igfs Target IGFS.
* @param log Log.
*/
- public HadoopIgfsInProc(IgfsEx igfs, Log log) {
+ public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException {
+ this.user = IgfsUtils.fixUserName(userName);
+
this.igfs = igfs;
+
this.log = log;
bufSize = igfs.configuration().getBlockSize() * 2;
}
/** {@inheritDoc} */
- @Override public IgfsHandshakeResponse handshake(String logDir) {
- igfs.clientLogDirectory(logDir);
+ @Override public IgfsHandshakeResponse handshake(final String logDir) {
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() {
+ @Override public IgfsHandshakeResponse apply() {
+ igfs.clientLogDirectory(logDir);
- return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
- igfs.globalSampling());
+ return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(),
+ igfs.globalSampling());
+ }
+ });
}
/** {@inheritDoc} */
@@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
+ @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException {
try {
- return igfs.info(path);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+ @Override public IgfsFile apply() {
+ return igfs.info(path);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+ @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
try {
- return igfs.update(path, props);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() {
+ @Override public IgfsFile apply() {
+ return igfs.update(path, props);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
+ @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException {
try {
- igfs.setTimes(path, accessTime, modificationTime);
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.setTimes(path, accessTime, modificationTime);
+
+ return null;
+ }
+ });
return true;
}
@@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
+ @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException {
try {
- igfs.rename(src, dest);
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.rename(src, dest);
+
+ return null;
+ }
+ });
return true;
}
@@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
+ @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
try {
- return igfs.delete(path, recursive);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() {
+ @Override public Boolean apply() {
+ return igfs.delete(path, recursive);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
/** {@inheritDoc} */
@Override public IgfsStatus fsStatus() throws IgniteCheckedException {
try {
- return igfs.globalSpace();
+ return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() {
+ @Override public IgfsStatus call() throws IgniteCheckedException {
+ return igfs.globalSpace();
+ }
+ });
}
catch (IllegalStateException e) {
throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " +
"stopping.");
}
+ catch (IgniteCheckedException | RuntimeException | Error e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new AssertionError("Must never go there.");
+ }
}
/** {@inheritDoc} */
- @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
+ @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException {
try {
- return igfs.listPaths(path);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() {
+ @Override public Collection<IgfsPath> apply() {
+ return igfs.listPaths(path);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
+ @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException {
try {
- return igfs.listFiles(path);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() {
+ @Override public Collection<IgfsFile> apply() {
+ return igfs.listFiles(path);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
+ @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
try {
- igfs.mkdirs(path, props);
+ IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
+ @Override public Void apply() {
+ igfs.mkdirs(path, props);
+
+ return null;
+ }
+ });
return true;
}
@@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
+ @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException {
try {
- return igfs.summary(path);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() {
+ @Override public IgfsPathSummary apply() {
+ return igfs.summary(path);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+ @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len)
throws IgniteCheckedException {
try {
- return igfs.affinity(path, start, len);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() {
+ @Override public Collection<IgfsBlockLocation> apply() {
+ return igfs.affinity(path, start, len);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
+ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException {
try {
- IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
- return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch)
+ @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
throws IgniteCheckedException {
try {
- IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
- return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length());
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
- int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
+ @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate,
+ final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException {
try {
- IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
- colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
+ colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props);
- return new HadoopIgfsStreamDelegate(this, stream);
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
}
/** {@inheritDoc} */
- @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
- @Nullable Map<String, String> props) throws IgniteCheckedException {
+ @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
+ final @Nullable Map<String, String> props) throws IgniteCheckedException {
try {
- IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
+ return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
+ @Override public HadoopIgfsStreamDelegate apply() {
+ IgfsOutputStream stream = igfs.append(path, bufSize, create, props);
- return new HadoopIgfsStreamDelegate(this, stream);
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream);
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
@@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
if (lsnr0 != null && log.isDebugEnabled())
log.debug("Removed stream event listener [delegate=" + delegate + ']');
}
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
index 0264e7b..3561e95 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.*;
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
public class HadoopIgfsIpcIo implements HadoopIgfsIo {
/** Logger. */
- private Log log;
+ private final Log log;
/** Request futures map. */
private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =
[07/10] incubator-ignite git commit: #gg-10369: Fix exception when we
start ignite and gridgain nodes.
Posted by se...@apache.org.
#gg-10369: Fix exception when we start ignite and gridgain nodes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5df06682
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5df06682
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5df06682
Branch: refs/heads/ignite-943
Commit: 5df06682c517731b3811ca4d0daabaa504b732f3
Parents: 8455c7a
Author: ivasilinets <iv...@gridgain.com>
Authored: Fri May 29 16:57:30 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Fri May 29 16:57:30 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/util/GridJavaProcess.java | 30 ++++++++++++--------
1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5df06682/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
index bff26ec..42fe089 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
@@ -128,25 +128,31 @@ public final class GridJavaProcess {
gjProc.log = log;
gjProc.procKilledC = procKilledC;
- String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
- String classpath = System.getProperty("java.class.path");
- String sfcp = System.getProperty("surefire.test.class.path");
-
- if (sfcp != null)
- classpath += System.getProperty("path.separator") + sfcp;
-
- if (cp != null)
- classpath += System.getProperty("path.separator") + cp;
-
List<String> procParams = params == null || params.isEmpty() ?
Collections.<String>emptyList() : Arrays.asList(params.split(" "));
List<String> procCommands = new ArrayList<>();
+ String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+
procCommands.add(javaBin);
procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs);
- procCommands.add("-cp");
- procCommands.add(classpath);
+
+ if (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath")) {
+ String classpath = System.getProperty("java.class.path");
+
+ String sfcp = System.getProperty("surefire.test.class.path");
+
+ if (sfcp != null)
+ classpath += System.getProperty("path.separator") + sfcp;
+
+ if (cp != null)
+ classpath += System.getProperty("path.separator") + cp;
+
+ procCommands.add("-cp");
+ procCommands.add(classpath);
+ }
+
procCommands.add(clsName);
procCommands.addAll(procParams);
[02/10] incubator-ignite git commit: ignite-37 Improve offheap
metrics for cache
Posted by se...@apache.org.
ignite-37 Improve offheap metrics for cache
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3f012b77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3f012b77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3f012b77
Branch: refs/heads/ignite-943
Commit: 3f012b77699405c06e5014ffe5ba4a7d4e7ab5f3
Parents: 036d68d
Author: agura <ag...@gridgain.com>
Authored: Thu Apr 30 20:53:53 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed May 27 15:02:39 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/cache/CacheMetrics.java | 187 +++++++--
.../internal/managers/GridManagerAdapter.java | 59 +--
.../processors/cache/CacheMetricsImpl.java | 305 +++++++++++++-
.../cache/CacheMetricsMXBeanImpl.java | 100 +++++
.../processors/cache/CacheMetricsSnapshot.java | 380 +++++++++++++----
.../processors/cache/GridCacheSwapManager.java | 118 ++++--
.../ignite/mxbean/CacheMetricsMXBean.java | 80 ++++
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 35 +-
.../org/apache/ignite/spi/IgniteSpiContext.java | 47 ---
.../spi/swapspace/file/FileSwapSpaceSpi.java | 8 +-
...CacheLocalOffHeapAndSwapMetricsSelfTest.java | 412 +++++++++++++++++++
.../testframework/GridSpiTestContext.java | 25 +-
.../IgniteCacheMetricsSelfTestSuite.java | 1 +
13 files changed, 1457 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 0d87326..799aace 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -30,21 +30,21 @@ public interface CacheMetrics {
/**
* The number of get requests that were satisfied by the cache.
*
- * @return the number of hits
+ * @return The number of hits.
*/
public long getCacheHits();
/**
* This is a measure of cache efficiency.
*
- * @return the percentage of successful hits, as a decimal e.g 75.
+ * @return The percentage of successful hits, as a decimal e.g 75.
*/
public float getCacheHitPercentage();
/**
* A miss is a get request that is not satisfied.
*
- * @return the number of misses
+ * @return The number of misses.
*/
public long getCacheMisses();
@@ -52,7 +52,7 @@ public interface CacheMetrics {
* Returns the percentage of cache accesses that did not find a requested entry
* in the cache.
*
- * @return the percentage of accesses that failed to find anything
+ * @return The percentage of accesses that failed to find anything.
*/
public float getCacheMissPercentage();
@@ -60,14 +60,14 @@ public interface CacheMetrics {
* The total number of requests to the cache. This will be equal to the sum of
* the hits and misses.
*
- * @return the number of gets
+ * @return The number of gets.
*/
public long getCacheGets();
/**
* The total number of puts to the cache.
*
- * @return the number of puts
+ * @return The number of puts.
*/
public long getCachePuts();
@@ -75,7 +75,7 @@ public interface CacheMetrics {
* The total number of removals from the cache. This does not include evictions,
* where the cache itself initiates the removal to make space.
*
- * @return the number of removals
+ * @return The number of removals.
*/
public long getCacheRemovals();
@@ -84,28 +84,28 @@ public interface CacheMetrics {
* initiated by the cache itself to free up space. An eviction is not treated as
* a removal and does not appear in the removal counts.
*
- * @return the number of evictions
+ * @return The number of evictions.
*/
public long getCacheEvictions();
/**
* The mean time to execute gets.
*
- * @return the time in µs
+ * @return The time in µs.
*/
public float getAverageGetTime();
/**
* The mean time to execute puts.
*
- * @return the time in µs
+ * @return The time in µs.
*/
public float getAveragePutTime();
/**
* The mean time to execute removes.
*
- * @return the time in µs
+ * @return The time in µs.
*/
public float getAverageRemoveTime();
@@ -113,7 +113,7 @@ public interface CacheMetrics {
/**
* The mean time to execute tx commit.
*
- * @return the time in µs
+ * @return The time in µs.
*/
public float getAverageTxCommitTime();
@@ -124,7 +124,6 @@ public interface CacheMetrics {
*/
public float getAverageTxRollbackTime();
-
/**
* Gets total number of transaction commits.
*
@@ -154,6 +153,62 @@ public interface CacheMetrics {
public long getOverflowSize();
/**
+ * The total number of get requests to the off-heap memory.
+ *
+ * @return The number of gets.
+ */
+ public long getOffHeapGets();
+
+ /**
+ * The total number of put requests to the off-heap memory.
+ *
+ * @return The number of puts.
+ */
+ public long getOffHeapPuts();
+
+ /**
+ * The total number of removals from the off-heap memory. This does not include evictions.
+ *
+ * @return The number of removals.
+ */
+ public long getOffHeapRemovals();
+
+ /**
+ * The total number of evictions from the off-heap memory.
+ *
+ * @return The number of evictions.
+ */
+ public long getOffHeapEvictions();
+
+ /**
+ * The number of get requests that were satisfied by the off-heap memory.
+ *
+ * @return The off-heap hits number.
+ */
+ public long getOffHeapHits();
+
+ /**
+ * Gets the percentage of hits on off-heap memory.
+ *
+ * @return The percentage of hits on off-heap memory.
+ */
+ public float getOffHeapHitPercentage();
+
+ /**
+ * A miss is a get request that is not satisfied by off-heap memory.
+ *
+ * @return The off-heap misses number.
+ */
+ public long getOffHeapMisses();
+
+ /**
+ * Gets the percentage of misses on off-heap memory.
+ *
+ * @return The percentage of misses on off-heap memory.
+ */
+ public float getOffHeapMissPercentage();
+
+ /**
* Gets number of entries stored in off-heap memory.
*
* @return Number of entries stored in off-heap memory.
@@ -161,6 +216,20 @@ public interface CacheMetrics {
public long getOffHeapEntriesCount();
/**
+ * Gets number of primary entries stored in off-heap memory.
+ *
+ * @return Number of primary entries stored in off-heap memory.
+ */
+ public long getOffHeapPrimaryEntriesCount();
+
+ /**
+ * Gets number of backup entries stored in off-heap memory.
+ *
+ * @return Number of backup entries stored in off-heap memory.
+ */
+ public long getOffHeapBackupEntriesCount();
+
+ /**
* Gets memory size allocated in off-heap.
*
* @return Memory size allocated in off-heap.
@@ -168,6 +237,76 @@ public interface CacheMetrics {
public long getOffHeapAllocatedSize();
/**
+ * Gets off-heap memory maximum size.
+ *
+ * @return Off-heap memory maximum size.
+ */
+ public long getOffHeapMaxSize();
+
+ /**
+ * The total number of get requests to the swap.
+ *
+ * @return The number of gets.
+ */
+ public long getSwapGets();
+
+ /**
+ * The total number of put requests to the swap.
+ *
+ * @return The number of puts.
+ */
+ public long getSwapPuts();
+
+ /**
+ * The total number of removals from the swap.
+ *
+ * @return The number of removals.
+ */
+ public long getSwapRemovals();
+
+ /**
+ * The number of get requests that were satisfied by the swap.
+ *
+ * @return The swap hits number.
+ */
+ public long getSwapHits();
+
+ /**
+ * A miss is a get request that is not satisfied by swap.
+ *
+ * @return The swap misses number.
+ */
+ public long getSwapMisses();
+
+ /**
+ * Gets number of entries stored in swap.
+ *
+ * @return Number of entries stored in swap.
+ */
+ public long getSwapEntriesCount();
+
+ /**
+ * Gets size of swap.
+ *
+ * @return Size of swap.
+ */
+ public long getSwapSize();
+
+ /**
+ * Gets the percentage of hits on swap.
+ *
+ * @return The percentage of hits on swap.
+ */
+ public float getSwapHitPercentage();
+
+ /**
+ * Gets the percentage of misses on swap.
+ *
+ * @return The percentage of misses on swap.
+ */
+ public float getSwapMissPercentage();
+
+ /**
* Gets number of non-{@code null} values in the cache.
*
* @return Number of non-{@code null} values in the cache.
@@ -184,7 +323,7 @@ public interface CacheMetrics {
/**
* Returns {@code true} if this cache is empty.
*
- * @return {@code true} if this cache is empty.
+ * @return {@code True} if this cache is empty.
*/
public boolean isEmpty();
@@ -294,7 +433,7 @@ public interface CacheMetrics {
public int getTxDhtRolledbackVersionsSize();
/**
- * Returns {@code True} if write-behind is enabled.
+ * Returns {@code true} if write-behind is enabled.
*
* @return {@code True} if write-behind is enabled.
*/
@@ -372,16 +511,16 @@ public interface CacheMetrics {
/**
* Determines the required type of keys for this {@link Cache}, if any.
*
- * @return the fully qualified class name of the key type,
- * or "java.lang.Object" if the type is undefined.
+ * @return The fully qualified class name of the key type,
+ * or {@code "java.lang.Object"} if the type is undefined.
*/
public String getKeyType();
/**
* Determines the required type of values for this {@link Cache}, if any.
*
- * @return the fully qualified class name of the value type,
- * or "java.lang.Object" if the type is undefined.
+ * @return The fully qualified class name of the value type,
+ * or {@code "java.lang.Object"} if the type is undefined.
*/
public String getValueType();
@@ -407,7 +546,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code true}.
*
- * @return true if the cache is store by value
+ * @return {@code True} if the cache is store by value.
*/
public boolean isStoreByValue();
@@ -416,7 +555,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code false}.
*
- * @return true if statistics collection is enabled
+ * @return {@code True} if statistics collection is enabled.
*/
public boolean isStatisticsEnabled();
@@ -425,7 +564,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code false}.
*
- * @return true if management is enabled
+ * @return {@code true} if management is enabled.
*/
public boolean isManagementEnabled();
@@ -434,7 +573,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code false}
*
- * @return {@code true} when a {@link Cache} is in
+ * @return {@code True} when a {@link Cache} is in
* "read-through" mode.
* @see CacheLoader
*/
@@ -448,7 +587,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code false}
*
- * @return {@code true} when a {@link Cache} is in "write-through" mode.
+ * @return {@code True} when a {@link Cache} is in "write-through" mode.
* @see CacheWriter
*/
public boolean isWriteThrough();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index c93c059..1eb7143 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,7 +23,6 @@ import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -31,7 +30,7 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.spi.*;
-import org.apache.ignite.spi.swapspace.*;
+
import org.jetbrains.annotations.*;
import javax.cache.expiry.*;
@@ -439,46 +438,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
return ctx.cache().cache(cacheName).containsKey(key);
}
- @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
- @Nullable ClassLoader ldr) {
- assert ctx.swap().enabled();
-
- try {
- ctx.swap().write(spaceName, key, val, ldr);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- @SuppressWarnings({"unchecked"})
- @Nullable @Override public <T> T readFromSwap(String spaceName, SwapKey key,
- @Nullable ClassLoader ldr) {
- try {
- assert ctx.swap().enabled();
-
- return ctx.swap().readValue(spaceName, key, ldr);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
@Override public int partition(String cacheName, Object key) {
return ctx.cache().cache(cacheName).affinity().partition(key);
}
- @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) {
- try {
- assert ctx.swap().enabled();
-
- ctx.swap().remove(spaceName, key, null, ldr);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
@Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
for (GridComponent comp : ctx) {
IgniteNodeValidationResult err = comp.validateNode(node);
@@ -508,26 +471,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
}
}
- @SuppressWarnings("unchecked")
- @Nullable @Override public <V> V readValueFromOffheapAndSwap(@Nullable String spaceName,
- Object key, @Nullable ClassLoader ldr) {
- try {
- IgniteInternalCache<Object, V> cache = ctx.cache().cache(spaceName);
-
- GridCacheContext cctx = cache.context();
-
- if (cctx.isNear())
- cctx = cctx.near().dht().context();
-
- GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
-
- return e != null ? CU.<V>value(e.value(), cctx, true) : null;
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
@Override public MessageFormatter messageFormatter() {
return ctx.io().formatter();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 560de97..3dcda3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+
/**
* Adapter for cache metrics.
*/
@@ -63,7 +65,7 @@ public class CacheMetricsImpl implements CacheMetrics {
private AtomicLong getTimeNanos = new AtomicLong();
/** Remove time taken nanos. */
- private AtomicLong removeTimeNanos = new AtomicLong();
+ private AtomicLong rmvTimeNanos = new AtomicLong();
/** Commit transaction time taken nanos. */
private AtomicLong commitTimeNanos = new AtomicLong();
@@ -71,6 +73,39 @@ public class CacheMetricsImpl implements CacheMetrics {
/** Commit transaction time taken nanos. */
private AtomicLong rollbackTimeNanos = new AtomicLong();
+ /** Number of reads from off-heap memory. */
+ private AtomicLong offHeapGets = new AtomicLong();
+
+ /** Number of writes to off-heap memory. */
+ private AtomicLong offHeapPuts = new AtomicLong();
+
+ /** Number of removed entries from off-heap memory. */
+ private AtomicLong offHeapRemoves = new AtomicLong();
+
+ /** Number of evictions from off-heap memory. */
+ private AtomicLong offHeapEvicts = new AtomicLong();
+
+ /** Number of off-heap hits. */
+ private AtomicLong offHeapHits = new AtomicLong();
+
+ /** Number of off-heap misses. */
+ private AtomicLong offHeapMisses = new AtomicLong();
+
+ /** Number of reads from swap. */
+ private AtomicLong swapGets = new AtomicLong();
+
+ /** Number of writes to swap. */
+ private AtomicLong swapPuts = new AtomicLong();
+
+ /** Number of removed entries from swap. */
+ private AtomicLong swapRemoves = new AtomicLong();
+
+ /** Number of swap hits. */
+ private AtomicLong swapHits = new AtomicLong();
+
+ /** Number of swap misses. */
+ private AtomicLong swapMisses = new AtomicLong();
+
/** Cache metrics. */
@GridToStringExclude
private transient CacheMetricsImpl delegate;
@@ -126,16 +161,160 @@ public class CacheMetricsImpl implements CacheMetrics {
}
/** {@inheritDoc} */
+ @Override public long getOffHeapGets() {
+ return offHeapGets.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPuts() {
+ return offHeapPuts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapRemovals() {
+ return offHeapRemoves.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapEvictions() {
+ return offHeapEvicts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapHits() {
+ return offHeapHits.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapHitPercentage() {
+ long hits0 = offHeapHits.get();
+ long gets0 = offHeapGets.get();
+
+ if (hits0 == 0)
+ return 0;
+
+ return (float) hits0 / gets0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapMisses() {
+ return offHeapMisses.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapMissPercentage() {
+ long misses0 = offHeapMisses.get();
+ long reads0 = offHeapGets.get();
+
+ if (misses0 == 0)
+ return 0;
+
+ return (float) misses0 / reads0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
return cctx.cache().offHeapEntriesCount();
}
/** {@inheritDoc} */
+ @Override public long getOffHeapPrimaryEntriesCount() {
+ try {
+ return cctx.swap().offheapEntriesCount(true, false, NONE);
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapBackupEntriesCount() {
+ try {
+ return cctx.swap().offheapEntriesCount(false, true, NONE);
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapAllocatedSize() {
return cctx.cache().offHeapAllocatedSize();
}
/** {@inheritDoc} */
+ @Override public long getOffHeapMaxSize() {
+ return cctx.config().getOffHeapMaxMemory();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapGets() {
+ return swapGets.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapPuts() {
+ return swapPuts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapRemovals() {
+ return swapRemoves.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapHits() {
+ return swapHits.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapMisses() {
+ return swapMisses.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapEntriesCount() {
+ try {
+ return cctx.cache().swapKeys();
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapSize() {
+ try {
+ return cctx.cache().swapSize();
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapHitPercentage() {
+ long hits0 = swapHits.get();
+ long gets0 = swapGets.get();
+
+ if (hits0 == 0)
+ return 0;
+
+ return (float) hits0 / gets0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapMissPercentage() {
+ long misses0 = swapMisses.get();
+ long reads0 = swapGets.get();
+
+ if (misses0 == 0)
+ return 0;
+
+ return (float) misses0 / reads0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
@Override public int getSize() {
return cctx.cache().size();
}
@@ -317,11 +496,24 @@ public class CacheMetricsImpl implements CacheMetrics {
txCommits.set(0);
txRollbacks.set(0);
putTimeNanos.set(0);
- removeTimeNanos.set(0);
+ rmvTimeNanos.set(0);
getTimeNanos.set(0);
commitTimeNanos.set(0);
rollbackTimeNanos.set(0);
+ offHeapGets.set(0);
+ offHeapPuts.set(0);
+ offHeapRemoves.set(0);
+ offHeapHits.set(0);
+ offHeapMisses.set(0);
+ offHeapEvicts.set(0);
+
+ swapGets.set(0);
+ swapPuts.set(0);
+ swapRemoves.set(0);
+ swapHits.set(0);
+ swapMisses.set(0);
+
if (delegate != null)
delegate.clear();
}
@@ -402,7 +594,7 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public float getAverageRemoveTime() {
- long timeNanos = removeTimeNanos.get();
+ long timeNanos = rmvTimeNanos.get();
long removesCnt = rmCnt.get();
if (timeNanos == 0 || removesCnt == 0)
@@ -483,7 +675,6 @@ public class CacheMetricsImpl implements CacheMetrics {
delegate.onTxRollback(duration);
}
-
/**
* Increments the get time accumulator.
*
@@ -514,7 +705,7 @@ public class CacheMetricsImpl implements CacheMetrics {
* @param duration the time taken in nanoseconds.
*/
public void addRemoveTimeNanos(long duration) {
- removeTimeNanos.addAndGet(duration);
+ rmvTimeNanos.addAndGet(duration);
if (delegate != null)
delegate.addRemoveTimeNanos(duration);
@@ -526,7 +717,7 @@ public class CacheMetricsImpl implements CacheMetrics {
* @param duration the time taken in nanoseconds.
*/
public void addRemoveAndGetTimeNanos(long duration) {
- removeTimeNanos.addAndGet(duration);
+ rmvTimeNanos.addAndGet(duration);
getTimeNanos.addAndGet(duration);
if (delegate != null)
@@ -581,6 +772,108 @@ public class CacheMetricsImpl implements CacheMetrics {
return cctx.config().isManagementEnabled();
}
+ /**
+ * Off-heap read callback.
+ *
+ * @param hit Hit or miss flag.
+ */
+ public void onOffHeapRead(boolean hit) {
+ offHeapGets.incrementAndGet();
+
+ if (hit)
+ offHeapHits.incrementAndGet();
+ else
+ offHeapMisses.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRead(hit);
+ }
+
+ /**
+ * Off-heap write callback.
+ */
+ public void onOffHeapWrite() {
+ offHeapPuts.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapWrite();
+ }
+
+ /**
+ * Off-heap remove callback.
+ */
+ public void onOffHeapRemove() {
+ offHeapRemoves.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRemove();
+ }
+
+ /**
+ * Off-heap evict callback.
+ */
+ public void onOffHeapEvict() {
+ offHeapEvicts.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRemove();
+ }
+
+ /**
+ * Swap read callback.
+ *
+ * @param hit Hit or miss flag.
+ */
+ public void onSwapRead(boolean hit) {
+ swapGets.incrementAndGet();
+
+ if (hit)
+ swapHits.incrementAndGet();
+ else
+ swapMisses.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onSwapRead(hit);
+ }
+
+ /**
+ * Swap write callback.
+ */
+ public void onSwapWrite() {
+ onSwapWrite(1);
+ }
+
+ /**
+ * Swap write callback.
+ *
+ * @param cnt Amount of entries.
+ */
+ public void onSwapWrite(int cnt) {
+ swapPuts.addAndGet(cnt);
+
+ if (delegate != null)
+ delegate.onSwapWrite(cnt);
+ }
+
+ /**
+ * Swap remove callback.
+ */
+ public void onSwapRemove() {
+ onSwapRemove(1);
+ }
+
+ /**
+ * Swap remove callback.
+ *
+ * @param cnt Amount of entries.
+ */
+ public void onSwapRemove(int cnt) {
+ swapRemoves.addAndGet(cnt);
+
+ if (delegate != null)
+ delegate.onSwapRemove(cnt);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheMetricsImpl.class, this);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
index e9d547c..966027a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
@@ -49,16 +49,116 @@ class CacheMetricsMXBeanImpl implements CacheMetricsMXBean {
}
/** {@inheritDoc} */
+ @Override public long getOffHeapGets() {
+ return cache.metrics0().getOffHeapGets();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPuts() {
+ return cache.metrics0().getOffHeapPuts();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapRemovals() {
+ return cache.metrics0().getOffHeapRemovals();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapEvictions() {
+ return cache.metrics0().getOffHeapEvictions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapHits() {
+ return cache.metrics0().getOffHeapHits();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapHitPercentage() {
+ return cache.metrics0().getOffHeapHitPercentage();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapMisses() {
+ return cache.metrics0().getOffHeapMisses();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapMissPercentage() {
+ return cache.metrics0().getOffHeapMissPercentage();
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
return cache.metrics0().getOffHeapEntriesCount();
}
/** {@inheritDoc} */
+ @Override public long getOffHeapPrimaryEntriesCount() {
+ return cache.metrics0().getOffHeapPrimaryEntriesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapBackupEntriesCount() {
+ return cache.metrics0().getOffHeapBackupEntriesCount();
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapAllocatedSize() {
return cache.metrics0().getOffHeapAllocatedSize();
}
/** {@inheritDoc} */
+ @Override public long getOffHeapMaxSize() {
+ return cache.metrics0().getOffHeapMaxSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapGets() {
+ return cache.metrics0().getSwapGets();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapPuts() {
+ return cache.metrics0().getSwapPuts();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapRemovals() {
+ return cache.metrics0().getSwapRemovals();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapHits() {
+ return cache.metrics0().getSwapHits();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapMisses() {
+ return cache.metrics0().getSwapMisses();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapHitPercentage() {
+ return cache.metrics0().getSwapHitPercentage();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapMissPercentage() {
+ return cache.metrics0().getSwapMissPercentage();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapEntriesCount() {
+ return cache.metrics0().getSwapEntriesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapSize() {
+ return cache.metrics0().getSwapSize();
+ }
+
+ /** {@inheritDoc} */
@Override public int getSize() {
return cache.metrics0().getSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index 4fe152a..cf16d9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -61,7 +61,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private float getAvgTimeNanos = 0;
/** Remove time taken nanos. */
- private float removeAvgTimeNanos = 0;
+ private float rmvAvgTimeNanos = 0;
/** Commit transaction time taken nanos. */
private float commitAvgTimeNanos = 0;
@@ -75,12 +75,60 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** Number of entries that was swapped to disk. */
private long overflowSize;
+ /** Number of reads from off-heap. */
+ private long offHeapGets;
+
+ /** Number of writes to off-heap. */
+ private long offHeapPuts;
+
+ /** Number of removed entries from off-heap. */
+ private long offHeapRemoves;
+
+ /** Number of evictions from off-heap. */
+ private long offHeapEvicts;
+
+ /** Off-heap hits number. */
+ private long offHeapHits;
+
+ /** Off-heap misses number. */
+ private long offHeapMisses;
+
/** Number of entries stored in off-heap memory. */
- private long offHeapEntriesCount;
+ private long offHeapEntriesCnt;
+
+ /** Number of primary entries stored in off-heap memory. */
+ private long offHeapPrimaryEntriesCnt;
+
+ /** Number of backup entries stored in off-heap memory. */
+ private long offHeapBackupEntriesCnt;
/** Memory size allocated in off-heap. */
private long offHeapAllocatedSize;
+ /** Off-heap memory maximum size*/
+ private long offHeapMaxSize;
+
+ /** Number of reads from swap. */
+ private long swapGets;
+
+ /** Number of writes to swap. */
+ private long swapPuts;
+
+ /** Number of removed entries from swap. */
+ private long swapRemoves;
+
+ /** Number of entries stored in swap. */
+ private long swapEntriesCnt;
+
+ /** Swap hits number. */
+ private long swapHits;
+
+ /** Swap misses number. */
+ private long swapMisses;
+
+ /** Swap size. */
+ private long swapSize;
+
/** Number of non-{@code null} values in the cache. */
private int size;
@@ -91,7 +139,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private boolean isEmpty;
/** Gets current size of evict queue used to batch up evictions. */
- private int dhtEvictQueueCurrentSize;
+ private int dhtEvictQueueCurrSize;
/** Transaction per-thread map size. */
private int txThreadMapSize;
@@ -106,7 +154,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private int txPrepareQueueSize;
/** Start version counts map size. */
- private int txStartVersionCountsSize;
+ private int txStartVerCountsSize;
/** Number of cached committed transaction IDs. */
private int txCommittedVersionsSize;
@@ -127,7 +175,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private int txDhtPrepareQueueSize;
/** DHT start version counts map size. */
- private int txDhtStartVersionCountsSize;
+ private int txDhtStartVerCountsSize;
/** Number of cached committed DHT transaction IDs. */
private int txDhtCommittedVersionsSize;
@@ -142,34 +190,34 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private int writeBehindFlushSize;
/** Count of worker threads. */
- private int writeBehindFlushThreadCount;
+ private int writeBehindFlushThreadCnt;
/** Flush frequency in milliseconds. */
- private long writeBehindFlushFrequency;
+ private long writeBehindFlushFreq;
/** Maximum size of batch. */
private int writeBehindStoreBatchSize;
/** Count of cache overflow events since start. */
- private int writeBehindTotalCriticalOverflowCount;
+ private int writeBehindTotalCriticalOverflowCnt;
/** Count of cache overflow events since start. */
- private int writeBehindCriticalOverflowCount;
+ private int writeBehindCriticalOverflowCnt;
/** Count of entries in store-retry state. */
- private int writeBehindErrorRetryCount;
+ private int writeBehindErrorRetryCnt;
/** Total count of entries in cache store internal buffer. */
- private int writeBehindBufferSize;
+ private int writeBehindBufSize;
/** */
private String keyType;
/** */
- private String valueType;
+ private String valType;
/** */
- private boolean isStoreByValue;
+ private boolean isStoreByVal;
/** */
private boolean isStatisticsEnabled;
@@ -207,45 +255,64 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
putAvgTimeNanos = m.getAveragePutTime();
getAvgTimeNanos = m.getAverageGetTime();
- removeAvgTimeNanos = m.getAverageRemoveTime();
+ rmvAvgTimeNanos = m.getAverageRemoveTime();
commitAvgTimeNanos = m.getAverageTxCommitTime();
rollbackAvgTimeNanos = m.getAverageTxRollbackTime();
cacheName = m.name();
overflowSize = m.getOverflowSize();
- offHeapEntriesCount = m.getOffHeapEntriesCount();
+
+ offHeapGets = m.getOffHeapGets();
+ offHeapPuts = m.getOffHeapPuts();
+ offHeapRemoves = m.getOffHeapRemovals();
+ offHeapEvicts = m.getOffHeapEvictions();
+ offHeapHits = m.getOffHeapHits();
+ offHeapMisses = m.getOffHeapMisses();
+ offHeapEntriesCnt = m.getOffHeapEntriesCount();
+ offHeapPrimaryEntriesCnt = m.getOffHeapPrimaryEntriesCount();
+ offHeapBackupEntriesCnt = m.getOffHeapBackupEntriesCount();
offHeapAllocatedSize = m.getOffHeapAllocatedSize();
+ offHeapMaxSize = m.getOffHeapMaxSize();
+
+ swapGets = m.getSwapGets();
+ swapPuts = m.getSwapPuts();
+ swapRemoves = m.getSwapRemovals();
+ swapHits = m.getSwapHits();
+ swapMisses = m.getSwapMisses();
+ swapEntriesCnt = m.getSwapEntriesCount();
+ swapSize = m.getSwapSize();
+
size = m.getSize();
keySize = m.getKeySize();
isEmpty = m.isEmpty();
- dhtEvictQueueCurrentSize = m.getDhtEvictQueueCurrentSize();
+ dhtEvictQueueCurrSize = m.getDhtEvictQueueCurrentSize();
txThreadMapSize = m.getTxThreadMapSize();
txXidMapSize = m.getTxXidMapSize();
txCommitQueueSize = m.getTxCommitQueueSize();
txPrepareQueueSize = m.getTxPrepareQueueSize();
- txStartVersionCountsSize = m.getTxStartVersionCountsSize();
+ txStartVerCountsSize = m.getTxStartVersionCountsSize();
txCommittedVersionsSize = m.getTxCommittedVersionsSize();
txRolledbackVersionsSize = m.getTxRolledbackVersionsSize();
txDhtThreadMapSize = m.getTxDhtThreadMapSize();
txDhtXidMapSize = m.getTxDhtXidMapSize();
txDhtCommitQueueSize = m.getTxDhtCommitQueueSize();
txDhtPrepareQueueSize = m.getTxDhtPrepareQueueSize();
- txDhtStartVersionCountsSize = m.getTxDhtStartVersionCountsSize();
+ txDhtStartVerCountsSize = m.getTxDhtStartVersionCountsSize();
txDhtCommittedVersionsSize = m.getTxDhtCommittedVersionsSize();
txDhtRolledbackVersionsSize = m.getTxDhtRolledbackVersionsSize();
isWriteBehindEnabled = m.isWriteBehindEnabled();
writeBehindFlushSize = m.getWriteBehindFlushSize();
- writeBehindFlushThreadCount = m.getWriteBehindFlushThreadCount();
- writeBehindFlushFrequency = m.getWriteBehindFlushFrequency();
+ writeBehindFlushThreadCnt = m.getWriteBehindFlushThreadCount();
+ writeBehindFlushFreq = m.getWriteBehindFlushFrequency();
writeBehindStoreBatchSize = m.getWriteBehindStoreBatchSize();
- writeBehindTotalCriticalOverflowCount = m.getWriteBehindTotalCriticalOverflowCount();
- writeBehindCriticalOverflowCount = m.getWriteBehindCriticalOverflowCount();
- writeBehindErrorRetryCount = m.getWriteBehindErrorRetryCount();
- writeBehindBufferSize = m.getWriteBehindBufferSize();
+ writeBehindTotalCriticalOverflowCnt = m.getWriteBehindTotalCriticalOverflowCount();
+ writeBehindCriticalOverflowCnt = m.getWriteBehindCriticalOverflowCount();
+ writeBehindErrorRetryCnt = m.getWriteBehindErrorRetryCount();
+ writeBehindBufSize = m.getWriteBehindBufferSize();
keyType = m.getKeyType();
- valueType = m.getValueType();
- isStoreByValue = m.isStoreByValue();
+ valType = m.getValueType();
+ isStoreByVal = m.isStoreByValue();
isStatisticsEnabled = m.isStatisticsEnabled();
isManagementEnabled = m.isManagementEnabled();
isReadThrough = m.isReadThrough();
@@ -263,21 +330,23 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
isEmpty = loc.isEmpty();
isWriteBehindEnabled = loc.isWriteBehindEnabled();
writeBehindFlushSize = loc.getWriteBehindFlushSize();
- writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount();
- writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency();
+ writeBehindFlushThreadCnt = loc.getWriteBehindFlushThreadCount();
+ writeBehindFlushFreq = loc.getWriteBehindFlushFrequency();
writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize();
- writeBehindBufferSize = loc.getWriteBehindBufferSize();
+ writeBehindBufSize = loc.getWriteBehindBufferSize();
size = loc.getSize();
keySize = loc.getKeySize();
keyType = loc.getKeyType();
- valueType = loc.getValueType();
- isStoreByValue = loc.isStoreByValue();
+ valType = loc.getValueType();
+ isStoreByVal = loc.isStoreByValue();
isStatisticsEnabled = loc.isStatisticsEnabled();
isManagementEnabled = loc.isManagementEnabled();
isReadThrough = loc.isReadThrough();
isWriteThrough = loc.isWriteThrough();
+ offHeapMaxSize = loc.getOffHeapMaxSize();
+
for (CacheMetrics e : metrics) {
reads += e.getCacheGets();
puts += e.getCachePuts();
@@ -290,7 +359,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
putAvgTimeNanos += e.getAveragePutTime();
getAvgTimeNanos += e.getAverageGetTime();
- removeAvgTimeNanos += e.getAverageRemoveTime();
+ rmvAvgTimeNanos += e.getAverageRemoveTime();
commitAvgTimeNanos += e.getAverageTxCommitTime();
rollbackAvgTimeNanos += e.getAverageTxRollbackTime();
@@ -299,19 +368,35 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
else
overflowSize = -1;
- offHeapEntriesCount += e.getOffHeapEntriesCount();
+ offHeapGets += e.getOffHeapGets();
+ offHeapPuts += e.getOffHeapPuts();
+ offHeapRemoves += e.getOffHeapRemovals();
+ offHeapEvicts += e.getOffHeapEvictions();
+ offHeapHits += e.getOffHeapHits();
+ offHeapMisses += e.getOffHeapMisses();
+ offHeapEntriesCnt += e.getOffHeapEntriesCount();
+ offHeapPrimaryEntriesCnt += e.getOffHeapPrimaryEntriesCount();
+ offHeapBackupEntriesCnt += e.getOffHeapBackupEntriesCount();
offHeapAllocatedSize += e.getOffHeapAllocatedSize();
+ swapGets += e.getSwapGets();
+ swapPuts += e.getSwapPuts();
+ swapRemoves += e.getSwapRemovals();
+ swapHits += e.getSwapHits();
+ swapMisses += e.getSwapMisses();
+ swapEntriesCnt += e.getSwapEntriesCount();
+ swapSize += e.getSwapSize();
+
if (e.getDhtEvictQueueCurrentSize() > -1)
- dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize();
+ dhtEvictQueueCurrSize += e.getDhtEvictQueueCurrentSize();
else
- dhtEvictQueueCurrentSize = -1;
+ dhtEvictQueueCurrSize = -1;
txThreadMapSize += e.getTxThreadMapSize();
txXidMapSize += e.getTxXidMapSize();
txCommitQueueSize += e.getTxCommitQueueSize();
txPrepareQueueSize += e.getTxPrepareQueueSize();
- txStartVersionCountsSize += e.getTxStartVersionCountsSize();
+ txStartVerCountsSize += e.getTxStartVersionCountsSize();
txCommittedVersionsSize += e.getTxCommittedVersionsSize();
txRolledbackVersionsSize += e.getTxRolledbackVersionsSize();
@@ -336,9 +421,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
txDhtPrepareQueueSize = -1;
if (e.getTxDhtStartVersionCountsSize() > -1)
- txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize();
+ txDhtStartVerCountsSize += e.getTxDhtStartVersionCountsSize();
else
- txDhtStartVersionCountsSize = -1;
+ txDhtStartVerCountsSize = -1;
if (e.getTxDhtCommittedVersionsSize() > -1)
txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize();
@@ -351,19 +436,19 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
txDhtRolledbackVersionsSize = -1;
if (e.getWriteBehindTotalCriticalOverflowCount() > -1)
- writeBehindTotalCriticalOverflowCount += e.getWriteBehindTotalCriticalOverflowCount();
+ writeBehindTotalCriticalOverflowCnt += e.getWriteBehindTotalCriticalOverflowCount();
else
- writeBehindTotalCriticalOverflowCount = -1;
+ writeBehindTotalCriticalOverflowCnt = -1;
if (e.getWriteBehindCriticalOverflowCount() > -1)
- writeBehindCriticalOverflowCount += e.getWriteBehindCriticalOverflowCount();
+ writeBehindCriticalOverflowCnt += e.getWriteBehindCriticalOverflowCount();
else
- writeBehindCriticalOverflowCount = -1;
+ writeBehindCriticalOverflowCnt = -1;
if (e.getWriteBehindErrorRetryCount() > -1)
- writeBehindErrorRetryCount += e.getWriteBehindErrorRetryCount();
+ writeBehindErrorRetryCnt += e.getWriteBehindErrorRetryCount();
else
- writeBehindErrorRetryCount = -1;
+ writeBehindErrorRetryCnt = -1;
}
int size = metrics.size();
@@ -371,7 +456,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
if (size > 1) {
putAvgTimeNanos /= size;
getAvgTimeNanos /= size;
- removeAvgTimeNanos /= size;
+ rmvAvgTimeNanos /= size;
commitAvgTimeNanos /= size;
rollbackAvgTimeNanos /= size;
}
@@ -435,7 +520,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public float getAverageRemoveTime() {
- return removeAvgTimeNanos;
+ return rmvAvgTimeNanos;
}
/** {@inheritDoc} */
@@ -469,8 +554,63 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
}
/** {@inheritDoc} */
+ @Override public long getOffHeapGets() {
+ return offHeapGets;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPuts() {
+ return offHeapPuts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapRemovals() {
+ return offHeapRemoves;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapEvictions() {
+ return offHeapEvicts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapHits() {
+ return offHeapHits;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapHitPercentage() {
+ if (offHeapHits == 0 || offHeapGets == 0)
+ return 0;
+
+ return (float) offHeapHits / offHeapGets * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapMisses() {
+ return offHeapMisses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapMissPercentage() {
+ if (offHeapMisses == 0 || offHeapGets == 0)
+ return 0;
+
+ return (float) offHeapMisses / offHeapGets * 100.0f;
+ }
+ /** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
- return offHeapEntriesCount;
+ return offHeapEntriesCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPrimaryEntriesCount() {
+ return offHeapPrimaryEntriesCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapBackupEntriesCount() {
+ return offHeapBackupEntriesCnt;
}
/** {@inheritDoc} */
@@ -479,6 +619,62 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
}
/** {@inheritDoc} */
+ @Override public long getOffHeapMaxSize() {
+ return offHeapMaxSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapGets() {
+ return swapGets;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapPuts() {
+ return swapPuts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapRemovals() {
+ return swapRemoves;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapHits() {
+ return swapHits;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapMisses() {
+ return swapMisses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapHitPercentage() {
+ if (swapHits == 0 || swapGets == 0)
+ return 0;
+
+ return (float) swapHits / swapGets * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapMissPercentage() {
+ if (swapMisses == 0 || swapGets == 0)
+ return 0;
+
+ return (float) swapMisses / swapGets * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapEntriesCount() {
+ return swapEntriesCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapSize() {
+ return swapSize;
+ }
+
+ /** {@inheritDoc} */
@Override public int getSize() {
return size;
}
@@ -495,7 +691,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getDhtEvictQueueCurrentSize() {
- return dhtEvictQueueCurrentSize;
+ return dhtEvictQueueCurrSize;
}
/** {@inheritDoc} */
@@ -520,7 +716,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getTxStartVersionCountsSize() {
- return txStartVersionCountsSize;
+ return txStartVerCountsSize;
}
/** {@inheritDoc} */
@@ -555,7 +751,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getTxDhtStartVersionCountsSize() {
- return txDhtStartVersionCountsSize;
+ return txDhtStartVerCountsSize;
}
/** {@inheritDoc} */
@@ -580,12 +776,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getWriteBehindFlushThreadCount() {
- return writeBehindFlushThreadCount;
+ return writeBehindFlushThreadCnt;
}
/** {@inheritDoc} */
@Override public long getWriteBehindFlushFrequency() {
- return writeBehindFlushFrequency;
+ return writeBehindFlushFreq;
}
/** {@inheritDoc} */
@@ -595,22 +791,22 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getWriteBehindTotalCriticalOverflowCount() {
- return writeBehindTotalCriticalOverflowCount;
+ return writeBehindTotalCriticalOverflowCnt;
}
/** {@inheritDoc} */
@Override public int getWriteBehindCriticalOverflowCount() {
- return writeBehindCriticalOverflowCount;
+ return writeBehindCriticalOverflowCnt;
}
/** {@inheritDoc} */
@Override public int getWriteBehindErrorRetryCount() {
- return writeBehindErrorRetryCount;
+ return writeBehindErrorRetryCnt;
}
/** {@inheritDoc} */
@Override public int getWriteBehindBufferSize() {
- return writeBehindBufferSize;
+ return writeBehindBufSize;
}
/** {@inheritDoc} */
@@ -620,12 +816,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public String getValueType() {
- return valueType;
+ return valType;
}
/** {@inheritDoc} */
@Override public boolean isStoreByValue() {
- return isStoreByValue;
+ return isStoreByVal;
}
/** {@inheritDoc} */
@@ -666,31 +862,49 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
out.writeFloat(putAvgTimeNanos);
out.writeFloat(getAvgTimeNanos);
- out.writeFloat(removeAvgTimeNanos);
+ out.writeFloat(rmvAvgTimeNanos);
out.writeFloat(commitAvgTimeNanos);
out.writeFloat(rollbackAvgTimeNanos);
out.writeLong(overflowSize);
- out.writeLong(offHeapEntriesCount);
+ out.writeLong(offHeapGets);
+ out.writeLong(offHeapPuts);
+ out.writeLong(offHeapRemoves);
+ out.writeLong(offHeapEvicts);
+ out.writeLong(offHeapHits);
+ out.writeLong(offHeapMisses);
+ out.writeLong(offHeapEntriesCnt);
+ out.writeLong(offHeapPrimaryEntriesCnt);
+ out.writeLong(offHeapBackupEntriesCnt);
out.writeLong(offHeapAllocatedSize);
- out.writeInt(dhtEvictQueueCurrentSize);
+ out.writeLong(offHeapMaxSize);
+
+ out.writeLong(swapGets);
+ out.writeLong(swapPuts);
+ out.writeLong(swapRemoves);
+ out.writeLong(swapHits);
+ out.writeLong(swapMisses);
+ out.writeLong(swapEntriesCnt);
+ out.writeLong(swapSize);
+
+ out.writeInt(dhtEvictQueueCurrSize);
out.writeInt(txThreadMapSize);
out.writeInt(txXidMapSize);
out.writeInt(txCommitQueueSize);
out.writeInt(txPrepareQueueSize);
- out.writeInt(txStartVersionCountsSize);
+ out.writeInt(txStartVerCountsSize);
out.writeInt(txCommittedVersionsSize);
out.writeInt(txRolledbackVersionsSize);
out.writeInt(txDhtThreadMapSize);
out.writeInt(txDhtXidMapSize);
out.writeInt(txDhtCommitQueueSize);
out.writeInt(txDhtPrepareQueueSize);
- out.writeInt(txDhtStartVersionCountsSize);
+ out.writeInt(txDhtStartVerCountsSize);
out.writeInt(txDhtCommittedVersionsSize);
out.writeInt(txDhtRolledbackVersionsSize);
- out.writeInt(writeBehindTotalCriticalOverflowCount);
- out.writeInt(writeBehindCriticalOverflowCount);
- out.writeInt(writeBehindErrorRetryCount);
+ out.writeInt(writeBehindTotalCriticalOverflowCnt);
+ out.writeInt(writeBehindCriticalOverflowCnt);
+ out.writeInt(writeBehindErrorRetryCnt);
}
/** {@inheritDoc} */
@@ -706,30 +920,48 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
putAvgTimeNanos = in.readFloat();
getAvgTimeNanos = in.readFloat();
- removeAvgTimeNanos = in.readFloat();
+ rmvAvgTimeNanos = in.readFloat();
commitAvgTimeNanos = in.readFloat();
rollbackAvgTimeNanos = in.readFloat();
overflowSize = in.readLong();
- offHeapEntriesCount = in.readLong();
+ offHeapGets = in.readLong();
+ offHeapPuts = in.readLong();
+ offHeapRemoves = in.readLong();
+ offHeapEvicts = in.readLong();
+ offHeapHits = in.readLong();
+ offHeapMisses = in.readLong();
+ offHeapEntriesCnt = in.readLong();
+ offHeapPrimaryEntriesCnt = in.readLong();
+ offHeapBackupEntriesCnt = in.readLong();
offHeapAllocatedSize = in.readLong();
- dhtEvictQueueCurrentSize = in.readInt();
+ offHeapMaxSize = in.readLong();
+
+ swapGets = in.readLong();
+ swapPuts = in.readLong();
+ swapRemoves = in.readLong();
+ swapHits = in.readLong();
+ swapMisses = in.readLong();
+ swapEntriesCnt = in.readLong();
+ swapSize = in.readLong();
+
+ dhtEvictQueueCurrSize = in.readInt();
txThreadMapSize = in.readInt();
txXidMapSize = in.readInt();
txCommitQueueSize = in.readInt();
txPrepareQueueSize = in.readInt();
- txStartVersionCountsSize = in.readInt();
+ txStartVerCountsSize = in.readInt();
txCommittedVersionsSize = in.readInt();
txRolledbackVersionsSize = in.readInt();
txDhtThreadMapSize = in.readInt();
txDhtXidMapSize = in.readInt();
txDhtCommitQueueSize = in.readInt();
txDhtPrepareQueueSize = in.readInt();
- txDhtStartVersionCountsSize = in.readInt();
+ txDhtStartVerCountsSize = in.readInt();
txDhtCommittedVersionsSize = in.readInt();
txDhtRolledbackVersionsSize = in.readInt();
- writeBehindTotalCriticalOverflowCount = in.readInt();
- writeBehindCriticalOverflowCount = in.readInt();
- writeBehindErrorRetryCount = in.readInt();
+ writeBehindTotalCriticalOverflowCnt = in.readInt();
+ writeBehindCriticalOverflowCnt = in.readInt();
+ writeBehindErrorRetryCnt = in.readInt();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..772e849 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -121,6 +121,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
warnFirstEvict();
writeToSwap(part, cctx.toCacheKeyObject(kb), vb);
+
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapEvict();
}
catch (IgniteCheckedException e) {
log.error("Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e);
@@ -395,8 +398,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return Reconstituted swap entry or {@code null} if entry is obsolete.
* @throws IgniteCheckedException If failed.
*/
- @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException
- {
+ @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException {
assert e != null;
checkIteratorQueue();
@@ -425,9 +427,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
// First check off-heap store.
- if (offheapEnabled)
- if (offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())))
+ if (offheapEnabled) {
+ boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRead(contains);
+
+ if (contains)
return true;
+ }
if (swapEnabled) {
assert key != null;
@@ -436,6 +444,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())),
cctx.deploy().globalLoader());
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onSwapRead(valBytes != null);
+
return valBytes != null;
}
@@ -444,7 +455,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param key Key to read.
- * @param keyBytes Key bytes.
* @param part Key partition.
* @param entryLocked {@code True} if cache entry is locked.
* @param readOffheap Read offheap flag.
@@ -481,6 +491,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (readOffheap && offheapEnabled) {
byte[] bytes = offheap.get(spaceName, part, key, keyBytes);
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRead(bytes != null);
+
if (bytes != null)
return swapEntry(unmarshalSwapEntry(bytes));
}
@@ -524,6 +537,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (offheapEnabled) {
byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ if (cctx.config().isStatisticsEnabled()) {
+ if (entryBytes != null)
+ cctx.cache().metrics0().onOffHeapRemove();
+
+ cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
+ }
+
if (entryBytes != null) {
GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
@@ -567,8 +587,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return Value from swap or {@code null}.
* @throws IgniteCheckedException If failed.
*/
- @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key,
- final int part)
+ @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, final int part)
throws IgniteCheckedException {
if (!swapEnabled)
return null;
@@ -582,6 +601,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
@Override public void apply(byte[] rmv) {
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onSwapRead(rmv != null);
+
if (rmv != null) {
try {
GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
@@ -611,6 +633,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
null);
}
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onSwapRemove();
+
// Always fire this event, since preloading depends on it.
onUnswapped(part, key, entry);
@@ -649,12 +674,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (!offheapEnabled && !swapEnabled)
return null;
- return read(entry.key(),
- entry.key().valueBytes(cctx.cacheObjectContext()),
- entry.partition(),
- locked,
- readOffheap,
- readSwap);
+ return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
+ readOffheap, readSwap);
}
/**
@@ -730,6 +751,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
final GridCacheQueryManager qryMgr = cctx.queries();
Collection<SwapKey> unprocessedKeys = null;
+
final Collection<GridCacheBatchSwapEntry> res = new ArrayList<>(keys.size());
// First try removing from offheap.
@@ -737,8 +759,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
for (KeyCacheObject key : keys) {
int part = cctx.affinity().partition(key);
- byte[] entryBytes =
- offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+ if(entryBytes != null && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
if (entryBytes != null) {
GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
@@ -848,6 +872,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
null);
}
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onSwapRemove();
+
// Always fire this event, since preloading depends on it.
onUnswapped(swapKey.partition(), key, entry);
@@ -880,7 +907,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- return offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+ if(rmv && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
+
+ return rmv;
}
/**
@@ -925,6 +957,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
return;
try {
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onSwapRemove();
+
GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
if (entry == null)
@@ -942,11 +977,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
// First try offheap.
if (offheapEnabled) {
- byte[] val = offheap.remove(spaceName,
- part,
- key.value(cctx.cacheObjectContext(), false),
+ byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false),
key.valueBytes(cctx.cacheObjectContext()));
+ if(val != null && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
+
if (val != null) {
if (c != null)
c.apply(val); // Probably we should read value and apply closure before removing...
@@ -1007,6 +1043,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (offheapEnabled) {
offheap.put(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()), entry.marshal());
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapWrite();
+
if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP))
cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null,
EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
@@ -1035,11 +1074,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (offheapEnabled) {
for (GridCacheBatchSwapEntry swapEntry : swapped) {
- offheap.put(spaceName,
- swapEntry.partition(),
- swapEntry.key(),
- swapEntry.key().valueBytes(cctx.cacheObjectContext()),
- swapEntry.marshal());
+ offheap.put(spaceName, swapEntry.partition(), swapEntry.key(),
+ swapEntry.key().valueBytes(cctx.cacheObjectContext()), swapEntry.marshal());
+
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapWrite();
if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP))
cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
@@ -1071,6 +1110,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
qryMgr.onSwap(batchSwapEntry.key());
}
}
+
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onSwapWrite(batch.size());
}
}
@@ -1082,17 +1124,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @param entry Entry bytes.
* @throws IgniteCheckedException If failed.
*/
- private void writeToSwap(int part,
- KeyCacheObject key,
- byte[] entry)
- throws IgniteCheckedException
- {
+ private void writeToSwap(int part, KeyCacheObject key, byte[] entry) throws IgniteCheckedException {
checkIteratorQueue();
swapMgr.write(spaceName,
new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())),
- entry,
- cctx.deploy().globalLoader());
+ entry, cctx.deploy().globalLoader());
+
+ if (cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onSwapWrite();
if (cctx.events().isRecordable(EVT_CACHE_OBJECT_SWAPPED))
cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid) null, null,
@@ -1274,7 +1314,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+ if(rmv && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
}
else
it.removeX();
@@ -1432,6 +1475,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
return it.hasNext();
}
+ @SuppressWarnings("unchecked")
@Override protected void onRemove() throws IgniteCheckedException {
if (cur == null)
throw new IllegalStateException("Method next() has not yet been called, or the remove() method " +
@@ -1616,7 +1660,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+ if(rmv && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
}
@Override protected void onClose() throws IgniteCheckedException {
@@ -1646,7 +1693,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+ boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+ if(rmv && cctx.config().isStatisticsEnabled())
+ cctx.cache().metrics0().onOffHeapRemove();
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
index 2ad07b5..5cdc72f 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java
@@ -100,14 +100,94 @@ public interface CacheMetricsMXBean extends CacheStatisticsMXBean, CacheMXBean,
public long getOverflowSize();
/** {@inheritDoc} */
+ @MXBeanDescription("Number of gets from off-heap memory.")
+ public long getOffHeapGets();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of puts to off-heap memory.")
+ public long getOffHeapPuts();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of removed entries from off-heap memory.")
+ public long getOffHeapRemovals();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of evictions from off-heap memory.")
+ public long getOffHeapEvictions();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of hits on off-heap memory.")
+ public long getOffHeapHits();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Percentage of hits on off-heap memory.")
+ public float getOffHeapHitPercentage();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of misses on off-heap memory.")
+ public long getOffHeapMisses();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Percentage of misses on off-heap memory.")
+ public float getOffHeapMissPercentage();
+
+ /** {@inheritDoc} */
@MXBeanDescription("Number of entries stored in off-heap memory.")
public long getOffHeapEntriesCount();
/** {@inheritDoc} */
+ @MXBeanDescription("Number of primary entries stored in off-heap memory.")
+ public long getOffHeapPrimaryEntriesCount();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of backup stored in off-heap memory.")
+ public long getOffHeapBackupEntriesCount();
+
+ /** {@inheritDoc} */
@MXBeanDescription("Memory size allocated in off-heap.")
public long getOffHeapAllocatedSize();
/** {@inheritDoc} */
+ @MXBeanDescription("Off-heap memory maximum size.")
+ public long getOffHeapMaxSize();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of gets from swap.")
+ public long getSwapGets();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of puts to swap.")
+ public long getSwapPuts();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of removed entries from swap.")
+ public long getSwapRemovals();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of hits on swap.")
+ public long getSwapHits();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of misses on swap.")
+ public long getSwapMisses();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Percentage of hits on swap.")
+ public float getSwapHitPercentage();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Percentage of misses on swap.")
+ public float getSwapMissPercentage();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Number of entries stored in swap.")
+ public long getSwapEntriesCount();
+
+ /** {@inheritDoc} */
+ @MXBeanDescription("Size of swap.")
+ public long getSwapSize();
+
+ /** {@inheritDoc} */
@MXBeanDescription("Number of non-null values in the cache.")
public int getSize();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 871512c..b13cc97 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.swapspace.*;
+
import org.jetbrains.annotations.*;
import javax.management.*;
@@ -453,19 +453,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
boolean isSpiConsistent = false;
- String tipStr = " (fix configuration or set " + "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)";
+ String tipStr = " (fix configuration or set " +
+ "-D" + IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK + "=true system property)";
if (rmtCls == null) {
if (!optional && starting)
- throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr + " [name=" + name +
- ", loc=" + locCls + ']');
+ throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr +
+ " [name=" + name + ", loc=" + locCls + ']');
sb.a(format(">>> Remote SPI with the same name is not configured: " + name, locCls));
}
else if (!locCls.equals(rmtCls)) {
if (!optional && starting)
- throw new IgniteSpiException("Remote SPI with the same name is of different type" + tipStr + " [name=" + name +
- ", loc=" + locCls + ", rmt=" + rmtCls + ']');
+ throw new IgniteSpiException("Remote SPI with the same name is of different type" + tipStr +
+ " [name=" + name + ", loc=" + locCls + ", rmt=" + rmtCls + ']');
sb.a(format(">>> Remote SPI with the same name is of different type: " + name, locCls, rmtCls));
}
@@ -627,27 +628,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
}
/** {@inheritDoc} */
- @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val,
- @Nullable ClassLoader ldr) {
- /* No-op. */
- }
-
- /** {@inheritDoc} */
- @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public int partition(String cacheName, Object key) {
return -1;
}
/** {@inheritDoc} */
- @Override public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) {
- // No-op.
- }
-
- /** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes() {
return locNode == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(locNode);
}
@@ -713,12 +698,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
- @Nullable ClassLoader ldr) {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public MessageFormatter messageFormatter() {
return msgFormatter;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 6852b6d..55f46e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
-import org.apache.ignite.spi.swapspace.*;
import org.jetbrains.annotations.*;
import javax.cache.*;
@@ -253,30 +252,6 @@ public interface IgniteSpiContext {
public <K> boolean containsKey(String cacheName, K key);
/**
- * Writes object to swap.
- *
- * @param spaceName Swap space name.
- * @param key Key.
- * @param val Value.
- * @param ldr Class loader (optional).
- * @throws IgniteException If any exception occurs.
- */
- public void writeToSwap(String spaceName, Object key, @Nullable Object val, @Nullable ClassLoader ldr)
- throws IgniteException;
-
- /**
- * Reads object from swap.
- *
- * @param spaceName Swap space name.
- * @param key Key.
- * @param ldr Class loader (optional).
- * @return Swapped value.
- * @throws IgniteException If any exception occurs.
- */
- @Nullable public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr)
- throws IgniteException;
-
- /**
* Calculates partition number for given key.
*
* @param cacheName Cache name.
@@ -286,16 +261,6 @@ public interface IgniteSpiContext {
public int partition(String cacheName, Object key);
/**
- * Removes object from swap.
- *
- * @param spaceName Swap space name.
- * @param key Key.
- * @param ldr Class loader (optional).
- * @throws IgniteException If any exception occurs.
- */
- public void removeFromSwap(String spaceName, Object key, @Nullable ClassLoader ldr) throws IgniteException;
-
- /**
* Validates that new node can join grid topology, this method is called on coordinator
* node before new node joins topology.
*
@@ -322,18 +287,6 @@ public interface IgniteSpiContext {
public SecuritySubject authenticatedSubject(UUID subjId) throws IgniteException;
/**
- * Reads swapped cache value from off-heap and swap.
- *
- * @param spaceName Off-heap space name.
- * @param key Key.
- * @param ldr Class loader for unmarshalling.
- * @return Value.
- * @throws IgniteException If any exception occurs.
- */
- @Nullable public <T> T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key,
- @Nullable ClassLoader ldr) throws IgniteException;
-
- /**
* Gets message formatter.
*
* @return Message formatter.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3f012b77/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index e7db285..7a88426 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -387,15 +387,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
Space space = space(spaceName, false);
- if (space == null)
- return;
-
- byte[] val = space.remove(key, c != null);
+ byte[] val = space == null ? null : space.remove(key, c != null);
if (c != null)
c.apply(val);
- notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
+ if (space != null)
+ notifyListener(EVT_SWAP_SPACE_DATA_REMOVED, spaceName);
}
/** {@inheritDoc} */
[05/10] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong
staging permissions while running MR job under hadoop accelerator): IGFS
part.
Posted by se...@apache.org.
[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8455c7a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8455c7a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8455c7a6
Branch: refs/heads/ignite-943
Commit: 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b
Parents: 3538819
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 15:40:26 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 15:40:26 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/igfs/IgfsUserContext.java | 119 +++++++++++
.../hadoop/fs/HadoopLazyConcurrentMap.java | 204 +++++++++++++++++++
2 files changed, 323 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
new file mode 100644
index 0000000..5a65bdb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Provides ability to execute IGFS code in a context of a specific user.
+ */
+public abstract class IgfsUserContext {
+ /** Thread local to hold the current user context. */
+ private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>();
+
+ /**
+ * Executes given callable in the given user context.
+ * The main contract of this method is that {@link #currentUser()} method invoked
+ * inside closure always returns 'user' this callable executed with.
+ * @param user the user name to invoke closure on behalf of.
+ * @param clo the closure to execute
+ * @param <T> The type of closure result.
+ * @return the result of closure execution.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+ */
+ public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+ if (F.isEmpty(user))
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+ final String ctxUser = userStackThreadLocal.get();
+
+ if (F.eq(ctxUser, user))
+ return clo.apply(); // correct context is already there
+
+ userStackThreadLocal.set(user);
+
+ try {
+ return clo.apply();
+ }
+ finally {
+ userStackThreadLocal.set(ctxUser);
+ }
+ }
+
+ /**
+ * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts
+ * callable that throws checked Exception.
+ * The Exception is not ever wrapped anyhow.
+ * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is:
+ * <pre name="code" class="java">
+ * public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
+ * try {
+ * return IgfsUserContext.doAs(user, new Callable<Foo>() {
+ * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+ * return makeSomeFoo(); // do the job
+ * }
+ * });
+ * }
+ * catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) {
+ * throw e;
+ * }
+ * catch (Exception e) {
+ * throw new AssertionError("Must never go there.");
+ * }
+ * }
+ * </pre>
+ * @param user the user name to invoke closure on behalf of.
+ * @param clbl the Callable to execute
+ * @param <T> The type of callable result.
+ * @return the result of closure execution.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+ */
+ public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
+ if (F.isEmpty(user))
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+ final String ctxUser = userStackThreadLocal.get();
+
+ if (F.eq(ctxUser, user))
+ return clbl.call(); // correct context is already there
+
+ userStackThreadLocal.set(user);
+
+ try {
+ return clbl.call();
+ }
+ finally {
+ userStackThreadLocal.set(ctxUser);
+ }
+ }
+
+ /**
+ * Gets the current context user.
+ * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will
+ * return null. Otherwise it will return the user name set in the most lower
+ * {@link #doAs(String, IgniteOutClosure)} call on the call stack.
+ * @return The current user, may be null.
+ */
+ @Nullable public static String currentUser() {
+ return userStackThreadLocal.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8455c7a6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
new file mode 100644
index 0000000..71b38c4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.jsr166.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Maps values by keys.
+ * Values are created lazily using {@link ValueFactory}.
+ *
+ * Despite of the name, does not depend on any Hadoop classes.
+ */
+public class HadoopLazyConcurrentMap<K, V extends Closeable> {
+ /** The map storing the actual values. */
+ private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>();
+
+ /** The factory passed in by the client. Will be used for lazy value creation. */
+ private final ValueFactory<K, V> factory;
+
+ /** Lock used to close the objects. */
+ private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
+
+ /** Flag indicating that this map is closed and cleared. */
+ private boolean closed;
+
+ /**
+ * Constructor.
+ * @param factory the factory to create new values lazily.
+ */
+ public HadoopLazyConcurrentMap(ValueFactory<K, V> factory) {
+ this.factory = factory;
+ }
+
+ /**
+ * Gets cached or creates a new value of V.
+ * Never returns null.
+ * @param k the key to associate the value with.
+ * @return the cached or newly created value, never null.
+ * @throws IgniteException on error
+ */
+ public V getOrCreate(K k) {
+ ValueWrapper w = map.get(k);
+
+ if (w == null) {
+ closeLock.readLock().lock();
+
+ try {
+ if (closed)
+ throw new IllegalStateException("Failed to create value for key [" + k
+ + "]: the map is already closed.");
+
+ final ValueWrapper wNew = new ValueWrapper(k);
+
+ w = map.putIfAbsent(k, wNew);
+
+ if (w == null) {
+ wNew.init();
+
+ w = wNew;
+ }
+ }
+ finally {
+ closeLock.readLock().unlock();
+ }
+ }
+
+ try {
+ V v = w.getValue();
+
+ assert v != null;
+
+ return v;
+ }
+ catch (IgniteCheckedException ie) {
+ throw new IgniteException(ie);
+ }
+ }
+
+ /**
+ * Clears the map and closes all the values.
+ */
+ public void close() throws IgniteCheckedException {
+ closeLock.writeLock().lock();
+
+ try {
+ closed = true;
+
+ Exception err = null;
+
+ Set<K> keySet = map.keySet();
+
+ for (K key : keySet) {
+ V v = null;
+
+ try {
+ v = map.get(key).getValue();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+
+ if (v != null) {
+ try {
+ v.close();
+ }
+ catch (Exception err0) {
+ if (err == null)
+ err = err0;
+ }
+ }
+ }
+
+ map.clear();
+
+ if (err != null)
+ throw new IgniteCheckedException(err);
+ }
+ finally {
+ closeLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Helper class that drives the lazy value creation.
+ */
+ private class ValueWrapper {
+ /** Future. */
+ private final GridFutureAdapter<V> fut = new GridFutureAdapter<>();
+
+ /** the key */
+ private final K key;
+
+ /**
+ * Creates new wrapper.
+ */
+ private ValueWrapper(K key) {
+ this.key = key;
+ }
+
+ /**
+ * Initializes the value using the factory.
+ */
+ private void init() {
+ try {
+ final V v0 = factory.createValue(key);
+
+ if (v0 == null)
+ throw new IgniteException("Failed to create non-null value. [key=" + key + ']');
+
+ fut.onDone(v0);
+ }
+ catch (Throwable e) {
+ fut.onDone(e);
+ }
+ }
+
+ /**
+ * Gets the available value or blocks until the value is initialized.
+ * @return the value, never null.
+ * @throws IgniteCheckedException on error.
+ */
+ V getValue() throws IgniteCheckedException {
+ return fut.get();
+ }
+ }
+
+ /**
+ * Interface representing the factory that creates map values.
+ * @param <K> the type of the key.
+ * @param <V> the type of the value.
+ */
+ public interface ValueFactory <K, V> {
+ /**
+ * Creates the new value. Should never return null.
+ *
+ * @param key the key to create value for
+ * @return the value.
+ * @throws IgniteException on failure.
+ */
+ public V createValue(K key);
+ }
+}
\ No newline at end of file
[03/10] incubator-ignite git commit: [IGNITE-958]: IGNITE-218 (Wrong
staging permissions while running MR job under hadoop accelerator): IGFS
part.
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 7dca049..f23c62c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
/** IGFS name. */
private final String igfs;
+ /** The user this out proc is performing on behalf of. */
+ private final String userName;
+
/** Client log. */
private final Log log;
@@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
* @param log Client logger.
* @throws IOException If failed.
*/
- public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException {
- this(host, port, grid, igfs, false, log);
+ public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException {
+ this(host, port, grid, igfs, false, log, user);
}
/**
@@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
* @param log Client logger.
* @throws IOException If failed.
*/
- public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException {
- this(null, port, grid, igfs, true, log);
+ public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException {
+ this(null, port, grid, igfs, true, log, user);
}
/**
@@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
* @param log Client logger.
* @throws IOException If failed.
*/
- private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log)
+ private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user)
throws IOException {
assert host != null && !shmem || host == null && shmem :
"Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
@@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
this.grid = grid;
this.igfs = igfs;
this.log = log;
+ this.userName = IgfsUtils.fixUserName(user);
io = HadoopIgfsIpcIo.get(log, endpoint);
@@ -173,6 +177,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(INFO);
msg.path(path);
+ msg.userName(userName);
return io.send(msg).chain(FILE_RES).get();
}
@@ -184,6 +189,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(UPDATE);
msg.path(path);
msg.properties(props);
+ msg.userName(userName);
return io.send(msg).chain(FILE_RES).get();
}
@@ -196,6 +202,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.path(path);
msg.accessTime(accessTime);
msg.modificationTime(modificationTime);
+ msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
@@ -207,6 +214,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(RENAME);
msg.path(src);
msg.destinationPath(dest);
+ msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
@@ -218,6 +226,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(DELETE);
msg.path(path);
msg.flag(recursive);
+ msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
@@ -231,6 +240,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.path(path);
msg.start(start);
msg.length(len);
+ msg.userName(userName);
return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
}
@@ -241,6 +251,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(PATH_SUMMARY);
msg.path(path);
+ msg.userName(userName);
return io.send(msg).chain(SUMMARY_RES).get();
}
@@ -252,6 +263,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(MAKE_DIRECTORIES);
msg.path(path);
msg.properties(props);
+ msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
@@ -262,6 +274,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(LIST_FILES);
msg.path(path);
+ msg.userName(userName);
return io.send(msg).chain(FILE_COL_RES).get();
}
@@ -272,6 +285,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(LIST_PATHS);
msg.path(path);
+ msg.userName(userName);
return io.send(msg).chain(PATH_COL_RES).get();
}
@@ -288,6 +302,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.command(OPEN_READ);
msg.path(path);
msg.flag(false);
+ msg.userName(userName);
IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
@@ -303,6 +318,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.path(path);
msg.flag(true);
msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+ msg.userName(userName);
IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
@@ -321,6 +337,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.properties(props);
msg.replication(replication);
msg.blockSize(blockSize);
+ msg.userName(userName);
Long streamId = io.send(msg).chain(LONG_RES).get();
@@ -336,6 +353,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
msg.path(path);
msg.flag(create);
msg.properties(props);
+ msg.userName(userName);
Long streamId = io.send(msg).chain(LONG_RES).get();
@@ -471,4 +489,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
}
};
}
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ return userName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index 1dada21..7d0db49 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -55,6 +55,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
/** Logger. */
private final Log log;
+ /** The user name this wrapper works on behalf of. */
+ private final String userName;
+
/**
* Constructor.
*
@@ -63,13 +66,15 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
* @param conf Configuration.
* @param log Current logger.
*/
- public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
+ public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
+ throws IOException {
try {
this.authority = authority;
this.endpoint = new HadoopIgfsEndpoint(authority);
this.logDir = logDir;
this.conf = conf;
this.log = log;
+ this.userName = user;
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to parse endpoint: " + authority, e);
@@ -362,13 +367,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
HadoopIgfsEx hadoop = null;
try {
- hadoop = new HadoopIgfsInProc(igfs, log);
+ hadoop = new HadoopIgfsInProc(igfs, log, userName);
curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
}
catch (IOException | IgniteCheckedException e) {
if (e instanceof HadoopIgfsCommunicationException)
- hadoop.close(true);
+ if (hadoop != null)
+ hadoop.close(true);
if (log.isDebugEnabled())
log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e);
@@ -384,7 +390,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
HadoopIgfsEx hadoop = null;
try {
- hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+ hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
}
@@ -409,7 +415,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
try {
hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
- log);
+ log, userName);
curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
}
@@ -430,7 +436,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
HadoopIgfsEx hadoop = null;
try {
- hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+ hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
+ log, userName);
curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index e9c859bd..dd18c66 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -239,9 +239,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
try {
- FileSystem fs = FileSystem.get(jobConf());
-
- HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+ FileSystem.get(jobConf());
LocalFileSystem locFs = FileSystem.getLocal(jobConf());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
index d11cabb..9bcd5de 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
@@ -39,6 +40,7 @@ import org.jsr166.*;
import java.io.*;
import java.net.*;
+import java.security.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -58,6 +60,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
/** Thread count for multithreaded tests. */
private static final int THREAD_CNT = 8;
+ /** Secondary file system user. */
+ private static final String SECONDARY_FS_USER = "secondary-default";
+
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -255,7 +260,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
if (mode != PRIMARY)
cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(secondaryFileSystemUriPath(),
- secondaryFileSystemConfigPath()));
+ secondaryFileSystemConfigPath(), SECONDARY_FS_USER));
cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
cfg.setManagementPort(-1);
@@ -278,11 +283,28 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
primaryFsCfg.addResource(U.resolveIgniteUrl(primaryFileSystemConfigPath()));
- fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+ UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, getClientFsUser());
+
+ // Create Fs on behalf of the client user:
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override public Object run() throws Exception {
+ fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+
+ return null;
+ }
+ });
barrier = new CyclicBarrier(THREAD_CNT);
}
+ /**
+ * Gets the user the Fs client operates on bahalf of.
+ * @return The user the Fs client operates on bahalf of.
+ */
+ protected String getClientFsUser() {
+ return "foo";
+ }
+
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
try {
@@ -297,14 +319,17 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
/** @throws Exception If failed. */
public void testStatus() throws Exception {
+ Path file1 = new Path("/file1");
- try (FSDataOutputStream file = fs.create(new Path("/file1"), EnumSet.noneOf(CreateFlag.class),
+ try (FSDataOutputStream file = fs.create(file1, EnumSet.noneOf(CreateFlag.class),
Options.CreateOpts.perms(FsPermission.getDefault()))) {
file.write(new byte[1024 * 1024]);
}
FsStatus status = fs.getFsStatus();
+ assertEquals(getClientFsUser(), fs.getFileStatus(file1).getOwner());
+
assertEquals(4, grid(0).cluster().nodes().size());
long used = 0, max = 0;
@@ -707,6 +732,8 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
os.close();
+ assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
fs.setOwner(file, "aUser", "aGroup");
assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -796,20 +823,20 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
int cnt = 2 * 1024;
- FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class),
- Options.CreateOpts.perms(FsPermission.getDefault()));
+ try (FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class),
+ Options.CreateOpts.perms(FsPermission.getDefault()))) {
- for (long i = 0; i < cnt; i++)
- out.writeLong(i);
+ for (long i = 0; i < cnt; i++)
+ out.writeLong(i);
+ }
- out.close();
+ assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
- FSDataInputStream in = fs.open(file, 1024);
+ try (FSDataInputStream in = fs.open(file, 1024)) {
- for (long i = 0; i < cnt; i++)
- assertEquals(i, in.readLong());
-
- in.close();
+ for (long i = 0; i < cnt; i++)
+ assertEquals(i, in.readLong());
+ }
}
/** @throws Exception If failed. */
@@ -1191,6 +1218,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission());
+
+ assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+ assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner());
}
/** @throws Exception If failed. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 9e84c51..b089995 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
primaryConfFullPath = null;
SecondaryFileSystemProvider provider =
- new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath, null);
+ new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
- primaryFs = provider.createFileSystem();
+ primaryFs = provider.createFileSystem(null);
primaryFsUri = provider.uri();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index d9a3c59..b828aad 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
@@ -43,6 +44,7 @@ import org.jsr166.*;
import java.io.*;
import java.lang.reflect.*;
import java.net.*;
+import java.security.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -72,6 +74,9 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
/** Secondary file system configuration path. */
private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml";
+ /** Secondary file system user. */
+ private static final String SECONDARY_FS_USER = "secondary-default";
+
/** Secondary endpoint configuration. */
protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG;
@@ -145,6 +150,14 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
endpoint = skipLocShmem ? "127.0.0.1:10500" : "shmem:10500";
}
+ /**
+ * Gets the user the Fs client operates on bahalf of.
+ * @return The user the Fs client operates on bahalf of.
+ */
+ protected String getClientFsUser() {
+ return "foo";
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
Configuration secondaryConf = configuration(SECONDARY_AUTHORITY, true, true);
@@ -235,7 +248,17 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
primaryFsCfg = configuration(PRIMARY_AUTHORITY, skipEmbed, skipLocShmem);
- fs = FileSystem.get(primaryFsUri, primaryFsCfg);
+ UserGroupInformation clientUgi = UserGroupInformation.getBestUGI(null, getClientFsUser());
+ assertNotNull(clientUgi);
+
+ // Create the Fs on behalf of the specific user:
+ clientUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override public Object run() throws Exception {
+ fs = FileSystem.get(primaryFsUri, primaryFsCfg);
+
+ return null;
+ }
+ });
barrier = new CyclicBarrier(THREAD_CNT);
}
@@ -324,7 +347,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
cfg.setDefaultMode(mode);
if (mode != PRIMARY)
- cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG_PATH));
+ cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
+ SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER));
cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
@@ -870,6 +894,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
os.close();
+ assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
fs.setOwner(file, "aUser", "aGroup");
assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -1001,19 +1027,19 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
int cnt = 2 * 1024;
- FSDataOutputStream out = fs.create(file, true, 1024);
-
- for (long i = 0; i < cnt; i++)
- out.writeLong(i);
+ try (FSDataOutputStream out = fs.create(file, true, 1024)) {
- out.close();
+ for (long i = 0; i < cnt; i++)
+ out.writeLong(i);
+ }
- FSDataInputStream in = fs.open(file, 1024);
+ assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
- for (long i = 0; i < cnt; i++)
- assertEquals(i, in.readLong());
+ try (FSDataInputStream in = fs.open(file, 1024)) {
- in.close();
+ for (long i = 0; i < cnt; i++)
+ assertEquals(i, in.readLong());
+ }
}
/** @throws Exception If failed. */
@@ -1344,7 +1370,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
String path = fs.getFileStatus(file).getPath().toString();
- assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous") + "/file"));
+ assertTrue(path.endsWith("/user/" + getClientFsUser() + "/file"));
}
/** @throws Exception If failed. */
@@ -1374,7 +1400,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
public void testGetWorkingDirectoryIfDefault() throws Exception {
String path = fs.getWorkingDirectory().toString();
- assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous")));
+ assertTrue(path.endsWith("/user/" + getClientFsUser()));
}
/** @throws Exception If failed. */
@@ -1412,17 +1438,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
@SuppressWarnings("OctalInteger")
public void testMkdirs() throws Exception {
Path fsHome = new Path(PRIMARY_URI);
- Path dir = new Path(fsHome, "/tmp/staging");
- Path nestedDir = new Path(dir, "nested");
+ final Path dir = new Path(fsHome, "/tmp/staging");
+ final Path nestedDir = new Path(dir, "nested");
- FsPermission dirPerm = FsPermission.createImmutable((short)0700);
- FsPermission nestedDirPerm = FsPermission.createImmutable((short)111);
+ final FsPermission dirPerm = FsPermission.createImmutable((short)0700);
+ final FsPermission nestedDirPerm = FsPermission.createImmutable((short)111);
assertTrue(fs.mkdirs(dir, dirPerm));
assertTrue(fs.mkdirs(nestedDir, nestedDirPerm));
assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission());
+
+ assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+ assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner());
}
/** @throws Exception If failed. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
index b92b213..fcfd587 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
@@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest
try {
switchHandlerErrorFlag(true);
- HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG);
+ HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null);
client.handshake(null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
index e103c5f..2c17ba9 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
@@ -144,6 +144,8 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe
Map<String, HadoopIgfsIpcIo> cache = (Map<String, HadoopIgfsIpcIo>)cacheField.get(null);
+ cache.clear(); // avoid influence of previous tests in the same process.
+
String name = "igfs:" + getTestGridName(0) + "@";
Configuration cfg = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
index 8cf31a2..5f90bd4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.*;
* Test file systems for the working directory multi-threading support.
*/
public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
+ /** the number of threads */
private static final int THREAD_COUNT = 3;
/** {@inheritDoc} */
@@ -87,10 +88,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
try {
int curThreadNum = threadNum.getAndIncrement();
- FileSystem fs = FileSystem.get(uri, cfg);
-
- HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum);
-
if ("file".equals(uri.getScheme()))
FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum));
@@ -149,24 +146,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
}
/**
- * Test IGFS multi-thread working directory.
- *
- * @throws Exception If fails.
- */
- public void testIgfs() throws Exception {
- testFileSystem(URI.create(igfsScheme()));
- }
-
- /**
- * Test HDFS multi-thread working directory.
- *
- * @throws Exception If fails.
- */
- public void testHdfs() throws Exception {
- testFileSystem(URI.create("hdfs://localhost/"));
- }
-
- /**
* Test LocalFS multi-thread working directory.
*
* @throws Exception If fails.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
index 8a046e0..89bf830 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
@@ -61,10 +61,10 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
int sigma = max((int)ceil(precission * exp), 5);
- X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission +
+ X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission +
" sigma: " + sigma);
- assertTrue(abs(exp - levelsCnts[level]) <= sigma);
+ assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails.
}
}