You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/21 08:58:24 UTC
[1/8] ignite git commit: ignite-3810 Fixed hang in FileSwapSpaceSpi
when too large value is stored
Repository: ignite
Updated Branches:
refs/heads/master bcbe8cc44 -> 86d31537f
ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780bf23d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780bf23d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780bf23d
Branch: refs/heads/master
Commit: 780bf23d5c89452dd062be4fab9e2e56d50bb9e2
Parents: 9b72d18
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 19 18:19:33 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 19 18:19:33 2016 +0300
----------------------------------------------------------------------
.../spi/swapspace/file/FileSwapSpaceSpi.java | 38 +++++++--
.../CacheSwapUnswapGetTestSmallQueueSize.java | 35 ++++++++
.../file/GridFileSwapSpaceSpiSelfTest.java | 89 ++++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 2 +
4 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 8809f08..9be5b93 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -639,7 +639,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
if (space == null && create) {
validateName(name);
- Space old = spaces.putIfAbsent(masked, space = new Space(masked));
+ Space old = spaces.putIfAbsent(masked, space = new Space(masked, log));
if (old != null)
space = old;
@@ -833,13 +833,21 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
/** */
private final int maxSize;
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private boolean queueSizeWarn;
+
/**
* @param minTakeSize Min size.
* @param maxSize Max size.
+ * @param log logger
*/
- private SwapValuesQueue(int minTakeSize, int maxSize) {
+ private SwapValuesQueue(int minTakeSize, int maxSize, IgniteLogger log) {
this.minTakeSize = minTakeSize;
this.maxSize = maxSize;
+ this.log = log;
}
/**
@@ -852,8 +860,24 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
lock.lock();
try {
- while (size + val.len > maxSize)
- mayAdd.await();
+ boolean largeVal = val.len > maxSize;
+
+ if (largeVal) {
+ if (!queueSizeWarn) {
+ U.warn(log, "Trying to save in swap entry which have size more than write queue size. " +
+ "You may wish to increase 'maxWriteQueueSize' in FileSwapSpaceSpi configuration " +
+ "[queueMaxSize=" + maxSize + ", valSize=" + val.len + ']');
+
+ queueSizeWarn = true;
+ }
+
+ while (size >= minTakeSize)
+ mayAdd.await();
+ }
+ else {
+ while (size + val.len > maxSize)
+ mayAdd.await();
+ }
size += val.len;
@@ -1419,7 +1443,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
private SwapFile right;
/** */
- private final SwapValuesQueue que = new SwapValuesQueue(writeBufSize, maxWriteQueSize);
+ private final SwapValuesQueue que;
/** Partitions. */
private final ConcurrentMap<Integer, ConcurrentMap<SwapKey, SwapValue>> parts =
@@ -1442,11 +1466,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
/**
* @param name Space name.
+ * @param log Logger.
*/
- private Space(String name) {
+ private Space(String name, IgniteLogger log) {
assert name != null;
this.name = name;
+ this.que = new SwapValuesQueue(writeBufSize, maxWriteQueSize, log);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
new file mode 100644
index 0000000..8d189fe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+
+/**
+ *
+ */
+public class CacheSwapUnswapGetTestSmallQueueSize extends CacheSwapUnswapGetTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((FileSwapSpaceSpi)cfg.getSwapSpaceSpi()).setMaxWriteQueueSize(2);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
index 64652b1..ab21165 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
@@ -25,11 +25,14 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -37,8 +40,10 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.swapspace.GridSwapSpaceSpiAbstractSelfTest;
import org.apache.ignite.spi.swapspace.SwapKey;
import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import org.junit.Assert;
/**
* Test for {@link FileSwapSpaceSpi}.
@@ -364,4 +369,88 @@ public class GridFileSwapSpaceSpiSelfTest extends GridSwapSpaceSpiAbstractSelfTe
assertEquals(hash0, hash1);
}
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testSaveValueLargeThenQueueSize() throws IgniteCheckedException {
+ final String spaceName = "mySpace";
+ final SwapKey key = new SwapKey("key");
+
+ final byte[] val = new byte[FileSwapSpaceSpi.DFLT_QUE_SIZE * 2];
+ Arrays.fill(val, (byte)1);
+
+ IgniteInternalFuture<byte[]> fut = GridTestUtils.runAsync(new Callable<byte[]>() {
+ @Override public byte[] call() throws Exception {
+ return saveAndGet(spaceName, key, val);
+ }
+ });
+
+ byte[] bytes = fut.get(10_000);
+
+ Assert.assertArrayEquals(val, bytes);
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void testSaveValueLargeThenQueueSizeMultiThreaded() throws Exception {
+ final String spaceName = "mySpace";
+
+ final int threads = 5;
+
+ long DURATION = 30_000;
+
+ final int maxSize = FileSwapSpaceSpi.DFLT_QUE_SIZE * 2;
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ try {
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!done.get()) {
+ SwapKey key = new SwapKey(rnd.nextInt(1000));
+
+ spi.store(spaceName, key, new byte[rnd.nextInt(0, maxSize)], context());
+ }
+
+ return null;
+ }
+ }, threads, " async-put");
+
+ Thread.sleep(DURATION);
+
+ done.set(true);
+
+ fut.get();
+ }
+ finally {
+ done.set(true);
+ }
+ }
+
+ /**
+ * @param spaceName Space name.
+ * @param key Key.
+ * @param val Value.
+ * @throws Exception If failed.
+ * @return Read bytes.
+ */
+ private byte[] saveAndGet(final String spaceName, final SwapKey key, byte[] val) throws Exception {
+ spi.store(spaceName, key, val, context());
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return spi.read(spaceName, key, context()) != null;
+ }
+ }, 10_000);
+
+ byte[] res = spi.read(spaceName, key, context());
+
+ assertNotNull(res);
+
+ return res;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 60d59d7..c494e73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam
import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest;
import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest;
import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest;
+import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTestSmallQueueSize;
import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest;
import org.apache.ignite.internal.processors.cache.CrossCacheLockTest;
import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest;
@@ -304,6 +305,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.class);
suite.addTestSuite(CacheSwapUnswapGetTest.class);
+ suite.addTestSuite(CacheSwapUnswapGetTestSmallQueueSize.class);
suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class);
suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class);
[8/8] ignite git commit: Merge branch ignite-1.7.2 into master.
Posted by ak...@apache.org.
Merge branch ignite-1.7.2 into master.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86d31537
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86d31537
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86d31537
Branch: refs/heads/master
Commit: 86d31537f8f9485bbc197f197b6ddd1cada46217
Parents: bcbe8cc 85c47eb
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Sep 21 15:59:04 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Sep 21 15:59:04 2016 +0700
----------------------------------------------------------------------
.../store/jdbc/CacheAbstractJdbcStore.java | 42 ++++---
.../cache/store/jdbc/CacheJdbcPojoStore.java | 5 +-
.../internal/processors/igfs/IgfsContext.java | 35 ++++++
.../processors/igfs/IgfsDataManager.java | 121 ++++++++-----------
.../internal/processors/igfs/IgfsImpl.java | 82 ++++++++++---
.../processors/igfs/IgfsInputStreamImpl.java | 103 +++++++++++-----
.../ignite/spi/discovery/tcp/ClientImpl.java | 18 ++-
.../spi/swapspace/file/FileSwapSpaceSpi.java | 38 +++++-
.../binary/BinaryObjectToStringSelfTest.java | 17 +++
.../CacheSwapUnswapGetTestSmallQueueSize.java | 35 ++++++
.../file/GridFileSwapSpaceSpiSelfTest.java | 89 ++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 2 +
12 files changed, 447 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
[3/8] ignite git commit: IGNITE-3859: IGFS: Support direct PROXY mode
invocation in the open method,
add proxy mode to IgfsInputStreamImpl This closes #1065. This closes #1083.
Posted by ak...@apache.org.
IGNITE-3859: IGFS: Support direct PROXY mode invocation in the open method, add proxy mode to IgfsInputStreamImpl
This closes #1065. This closes #1083.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a35ee9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a35ee9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a35ee9d
Branch: refs/heads/master
Commit: 5a35ee9dad194b3009151b79f0ebd3976bb8fd22
Parents: 2474e2b
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Tue Sep 20 14:10:55 2016 +0500
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Tue Sep 20 14:10:55 2016 +0500
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsContext.java | 35 ++++++
.../processors/igfs/IgfsDataManager.java | 121 ++++++++-----------
.../internal/processors/igfs/IgfsImpl.java | 82 ++++++++++---
.../processors/igfs/IgfsInputStreamImpl.java | 103 +++++++++++-----
4 files changed, 226 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 3e01246..3405b53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.igfs;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.internal.GridKernalContext;
@@ -60,6 +63,12 @@ public class IgfsContext {
/** Local cluster node. */
private volatile ClusterNode locNode;
+ /** IGFS executor service. */
+ private ExecutorService igfsSvc;
+
+ /** Logger. */
+ protected IgniteLogger log;
+
/**
* @param ctx Kernal context.
* @param cfg IGFS configuration.
@@ -85,6 +94,10 @@ public class IgfsContext {
this.srvMgr = add(srvMgr);
this.fragmentizerMgr = add(fragmentizerMgr);
+ log = ctx.log(IgfsContext.class);
+
+ igfsSvc = ctx.getIgfsExecutorService();
+
igfs = new IgfsImpl(this);
}
@@ -206,6 +219,28 @@ public class IgfsContext {
}
/**
+ * Executes runnable in IGFS executor service. If execution rejected, runnable will be executed
+ * in caller thread.
+ *
+ * @param r Runnable to execute.
+ */
+ public void runInIgfsThreadPool(Runnable r) {
+ try {
+ igfsSvc.submit(r);
+ }
+ catch (RejectedExecutionException ignored) {
+ // This exception will happen if network speed is too low and data comes faster
+ // than we can send it to remote nodes.
+ try {
+ r.run();
+ }
+ catch (Exception e) {
+ log.warning("Failed to execute IGFS runnable: " + r, e);
+ }
+ }
+ }
+
+ /**
* Adds manager to managers list.
*
* @param mgr Manager.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index d2183f9..2f704ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
@@ -74,12 +73,9 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -123,9 +119,6 @@ public class IgfsDataManager extends IgfsManager {
/** Affinity key generator. */
private AtomicLong affKeyGen = new AtomicLong();
- /** IGFS executor service. */
- private ExecutorService igfsSvc;
-
/** Request ID counter for write messages. */
private AtomicLong reqIdCtr = new AtomicLong();
@@ -183,8 +176,6 @@ public class IgfsDataManager extends IgfsManager {
}
}, EVT_NODE_LEFT, EVT_NODE_FAILED);
- igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
-
delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
"igfs-" + igfsName + "-delete-worker", log);
}
@@ -345,45 +336,11 @@ public class IgfsDataManager extends IgfsManager {
if (oldRmtReadFut == null) {
try {
- if (log.isDebugEnabled())
- log.debug("Reading non-local data block in the secondary file system [path=" +
- path + ", fileInfo=" + fileInfo + ", blockIdx=" + blockIdx + ']');
-
- int blockSize = fileInfo.blockSize();
-
- long pos = blockIdx * blockSize; // Calculate position for Hadoop
-
- res = new byte[blockSize];
-
- int read = 0;
-
- synchronized (secReader) {
- try {
- // Delegate to the secondary file system.
- while (read < blockSize) {
- int r = secReader.read(pos + read, res, read, blockSize - read);
-
- if (r < 0)
- break;
-
- read += r;
- }
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to read data due to secondary file system " +
- "exception: " + e.getMessage(), e);
- }
- }
-
- // If we did not read full block at the end of the file - trim it.
- if (read != blockSize)
- res = Arrays.copyOf(res, read);
+ res = secondaryDataBlock(path, blockIdx, secReader, fileInfo.blockSize());
rmtReadFut.onDone(res);
putBlock(fileInfo.blockSize(), key, res);
-
- igfsCtx.metrics().addReadBlocks(1, 1);
}
catch (IgniteCheckedException e) {
rmtReadFut.onDone(e);
@@ -417,11 +374,59 @@ public class IgfsDataManager extends IgfsManager {
}
/**
+ * Get data block for specified block index from secondary reader.
+ *
+ * @param path Path reading from.
+ * @param blockIdx Block index.
+ * @param secReader Optional secondary file system reader.
+ * @param blockSize Block size.
+ * @return Requested data block or {@code null} if nothing found.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public byte[] secondaryDataBlock(IgfsPath path, long blockIdx,
+ IgfsSecondaryFileSystemPositionedReadable secReader, int blockSize) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Reading non-local data block in the secondary file system [path=" +
+ path + ", blockIdx=" + blockIdx + ']');
+
+ long pos = blockIdx * blockSize; // Calculate position for Hadoop
+
+ byte[] res = new byte[blockSize];
+
+ int read = 0;
+
+ try {
+ // Delegate to the secondary file system.
+ while (read < blockSize) {
+ int r = secReader.read(pos + read, res, read, blockSize - read);
+
+ if (r < 0)
+ break;
+
+ read += r;
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to read data due to secondary file system " +
+ "exception: " + e.getMessage(), e);
+ }
+
+ // If we did not read full block at the end of the file - trim it.
+ if (read != blockSize)
+ res = Arrays.copyOf(res, read);
+
+ igfsCtx.metrics().addReadBlocks(1, 1);
+
+ return res;
+ }
+
+ /**
* Stores the given block in data cache.
*
* @param blockSize The size of the block.
* @param key The data cache key of the block.
* @param data The new value of the block.
+ * @throws IgniteCheckedException If failed.
*/
private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException {
if (data.length < blockSize)
@@ -967,8 +972,8 @@ public class IgfsDataManager extends IgfsManager {
}
}
else {
- callIgfsLocalSafe(new GridPlainCallable<Object>() {
- @Override @Nullable public Object call() throws Exception {
+ igfsCtx.runInIgfsThreadPool(new Runnable() {
+ @Override public void run() {
storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
@@ -981,8 +986,6 @@ public class IgfsDataManager extends IgfsManager {
}
}
});
-
- return null;
}
});
}
@@ -1070,28 +1073,6 @@ public class IgfsDataManager extends IgfsManager {
}
/**
- * Executes callable in IGFS executor service. If execution rejected, callable will be executed
- * in caller thread.
- *
- * @param c Callable to execute.
- */
- private <T> void callIgfsLocalSafe(Callable<T> c) {
- try {
- igfsSvc.submit(c);
- }
- catch (RejectedExecutionException ignored) {
- // This exception will happen if network speed is too low and data comes faster
- // than we can send it to remote nodes.
- try {
- c.call();
- }
- catch (Exception e) {
- log.warning("Failed to execute IGFS callable: " + c, e);
- }
- }
- }
-
- /**
* @param blocks Blocks to write.
* @return Future that will be completed after put is done.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 45596a3..87a4699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -47,6 +47,7 @@ import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -949,34 +950,79 @@ public final class IgfsImpl implements IgfsEx {
IgfsMode mode = resolveMode(path);
- if (mode != PRIMARY) {
- assert IgfsUtils.isDualMode(mode);
+ switch (mode) {
+ case PRIMARY: {
+ IgfsEntryInfo info = meta.infoForPath(path);
- IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0);
+ if (info == null)
+ throw new IgfsPathNotFoundException("File not found: " + path);
- IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, desc.info(),
- cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader());
+ if (!info.isFile())
+ throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+ // Input stream to read data from grid cache with separate blocks.
+ IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+ cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null,
+ info.length(), info.blockSize(), info.blocksCount(), false);
- return os;
- }
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
- IgfsEntryInfo info = meta.infoForPath(path);
+ return os;
+ }
- if (info == null)
- throw new IgfsPathNotFoundException("File not found: " + path);
+ case DUAL_ASYNC:
+ case DUAL_SYNC: {
+ assert IgfsUtils.isDualMode(mode);
- if (!info.isFile())
- throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
+ IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0);
+
+ IgfsEntryInfo info = desc.info();
- // Input stream to read data from grid cache with separate blocks.
- IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
- cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null);
+ IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+ cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(),
+ info.length(), info.blockSize(), info.blocksCount(), false);
+
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+
+ return os;
+ }
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+ case PROXY: {
+ assert secondaryFs != null;
- return os;
+ IgfsFile info = info(path);
+
+ if (info == null)
+ throw new IgfsPathNotFoundException("File not found: " + path);
+
+ if (!info.isFile())
+ throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
+
+ IgfsSecondaryFileSystemPositionedReadable secReader =
+ new IgfsLazySecondaryFileSystemPositionedReadable(secondaryFs, path, bufSize);
+
+ long len = info.length();
+
+ int blockSize = info.blockSize() > 0 ? info.blockSize() : cfg.getBlockSize();
+
+ long blockCnt = len / blockSize;
+
+ if (len % blockSize != 0)
+ blockCnt++;
+
+ IgfsInputStream os = new IgfsInputStreamImpl(igfsCtx, path, null,
+ cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, secReader,
+ info.length(), blockSize, blockCnt, true);
+
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+
+ return os;
+ }
+
+ default:
+ assert false : "Unexpected mode " + mode;
+ return null;
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index 2f9f2fc..0d9f2cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
@@ -109,21 +110,44 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** Time consumed on reading. */
private long time;
+ /** File Length. */
+ private long len;
+
+ /** Block size to read. */
+ private int blockSize;
+
+ /** Block size to read. */
+ private long blocksCnt;
+
+ /** Proxy mode. */
+ private boolean proxy;
+
/**
* Constructs file output stream.
- *
- * @param igfsCtx IGFS context.
+ * @param igfsCtx IGFS context.
* @param path Path to stored file.
* @param fileInfo File info to write binary data to.
* @param prefetchBlocks Number of blocks to prefetch.
* @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
* @param secReader Optional secondary file system reader.
+ * @param len File length.
+ * @param blockSize Block size.
+ * @param blocksCnt Blocks count.
+ * @param proxy Proxy mode flag.
*/
- IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks,
- int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader) {
+ IgfsInputStreamImpl(
+ IgfsContext igfsCtx,
+ IgfsPath path,
+ @Nullable IgfsEntryInfo fileInfo,
+ int prefetchBlocks,
+ int seqReadsBeforePrefetch,
+ @Nullable IgfsSecondaryFileSystemPositionedReadable secReader,
+ long len,
+ int blockSize,
+ long blocksCnt,
+ boolean proxy) {
assert igfsCtx != null;
assert path != null;
- assert fileInfo != null;
this.igfsCtx = igfsCtx;
this.path = path;
@@ -131,6 +155,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
this.prefetchBlocks = prefetchBlocks;
this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
this.secReader = secReader;
+ this.len = len;
+ this.blockSize = blockSize;
+ this.blocksCnt = blocksCnt;
+ this.proxy = proxy;
log = igfsCtx.kernalContext().log(IgfsInputStream.class);
@@ -154,7 +182,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** {@inheritDoc} */
@Override public long length() {
- return fileInfo.length();
+ return len;
}
/** {@inheritDoc} */
@@ -195,7 +223,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** {@inheritDoc} */
@Override public synchronized int available() throws IOException {
- long l = fileInfo.length() - pos;
+ long l = len - pos;
if (l < 0)
return 0;
@@ -240,7 +268,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
@SuppressWarnings("IfMayBeConditional")
public synchronized byte[][] readChunks(long pos, int len) throws IOException {
// Readable bytes in the file, starting from the specified position.
- long readable = fileInfo.length() - pos;
+ long readable = this.len - pos;
if (readable <= 0)
return EMPTY_CHUNKS;
@@ -254,8 +282,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
bytes += len;
- int start = (int)(pos / fileInfo.blockSize());
- int end = (int)((pos + len - 1) / fileInfo.blockSize());
+ int start = (int)(pos / blockSize);
+ int end = (int)((pos + len - 1) / blockSize);
int chunkCnt = end - start + 1;
@@ -264,7 +292,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
for (int i = 0; i < chunkCnt; i++) {
byte[] block = blockFragmentizerSafe(start + i);
- int blockOff = (int)(pos % fileInfo.blockSize());
+ int blockOff = (int)(pos % blockSize);
int blockLen = Math.min(len, block.length - blockOff);
// If whole block can be used as result, do not do array copy.
@@ -366,7 +394,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
return 0; // Fully read done: read zero bytes correctly.
// Readable bytes in the file, starting from the specified position.
- long readable = fileInfo.length() - pos;
+ long readable = this.len - pos;
if (readable <= 0)
return -1; // EOF.
@@ -378,10 +406,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
assert len > 0;
- byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize());
+ byte[] block = blockFragmentizerSafe(pos / blockSize);
// Skip bytes to expected position.
- int blockOff = (int)(pos % fileInfo.blockSize());
+ int blockOff = (int)(pos % blockSize);
len = Math.min(len, block.length - blockOff);
@@ -412,7 +440,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']');
// This failure may be caused by file being fragmented.
- if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
+ if (fileInfo != null && fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
IgfsEntryInfo newInfo = igfsCtx.meta().info(fileInfo.id());
// File was deleted.
@@ -459,7 +487,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
prevBlockIdx = blockIdx;
- bytesFut = dataBlock(fileInfo, blockIdx);
+ bytesFut = dataBlock(blockIdx);
assert bytesFut != null;
@@ -470,10 +498,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) {
for (int i = 1; i <= prefetchBlocks; i++) {
// Ensure that we do not prefetch over file size.
- if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length())
+ if (blockSize * (i + blockIdx) >= len)
break;
else if (locCache.get(blockIdx + i) == null)
- addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i));
+ addLocalCacheFuture(blockIdx + i, dataBlock(blockIdx + i));
}
}
@@ -483,17 +511,17 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
throw new IgfsCorruptedFileException("Failed to retrieve file's data block (corrupted file?) " +
"[path=" + path + ", blockIdx=" + blockIdx + ']');
- int blockSize = fileInfo.blockSize();
+ int blockSize0 = blockSize;
- if (blockIdx == fileInfo.blocksCount() - 1)
- blockSize = (int)(fileInfo.length() % blockSize);
+ if (blockIdx == blocksCnt - 1)
+ blockSize0 = (int)(len % blockSize0);
// If part of the file was reserved for writing, but was not actually written.
- if (bytes.length < blockSize)
+ if (bytes.length < blockSize0)
throw new IOException("Inconsistent file's data block (incorrectly written?)" +
" [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length +
- ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize() +
- ", fileLen=" + fileInfo.length() + ']');
+ ", expectedBlockSize=" + blockSize0 + ", fileBlockSize=" + blockSize +
+ ", fileLen=" + len + ']');
return bytes;
}
@@ -538,14 +566,35 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/**
* Get data block for specified block index.
*
- * @param fileInfo File info.
* @param blockIdx Block index.
* @return Requested data block or {@code null} if nothing found.
* @throws IgniteCheckedException If failed.
*/
- @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsEntryInfo fileInfo, long blockIdx)
+ @Nullable protected IgniteInternalFuture<byte[]> dataBlock(final long blockIdx)
throws IgniteCheckedException {
- return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
+ if (proxy) {
+ assert secReader != null;
+
+ final GridFutureAdapter<byte[]> fut = new GridFutureAdapter<>();
+
+ igfsCtx.runInIgfsThreadPool(new Runnable() {
+ @Override public void run() {
+ try {
+ fut.onDone(igfsCtx.data().secondaryDataBlock(path, blockIdx, secReader, blockSize));
+ }
+ catch (Throwable e) {
+ fut.onDone(null, e);
+ }
+ }
+ });
+
+ return fut;
+ }
+ else {
+ assert fileInfo != null;
+
+ return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
+ }
}
/** {@inheritDoc} */
[4/8] ignite git commit: Added missing header to
BinaryObjectToStringSelfTest.
Posted by ak...@apache.org.
Added missing header to BinaryObjectToStringSelfTest.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/135f0a8a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/135f0a8a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/135f0a8a
Branch: refs/heads/master
Commit: 135f0a8a39fb6895fada18d210260deebfb9426d
Parents: c1372ce
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 10:33:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 10:33:11 2016 +0300
----------------------------------------------------------------------
.../binary/BinaryObjectToStringSelfTest.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/135f0a8a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
index cc6cf8b..df6bcde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.binary;
import java.util.Arrays;
[5/8] ignite git commit: Merge branch ignite-1.6.8 into ignite-1.6.9.
Posted by ak...@apache.org.
Merge branch ignite-1.6.8 into ignite-1.6.9.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b3ba8b88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b3ba8b88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b3ba8b88
Branch: refs/heads/master
Commit: b3ba8b886bd6e582a860d836da093918dc521745
Parents: 5a35ee9 135f0a8
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Sep 21 15:20:07 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Sep 21 15:20:07 2016 +0700
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 12 +
.../internal/GridEventConsumeHandler.java | 5 -
.../internal/GridMessageListenHandler.java | 5 -
.../internal/binary/BinaryObjectExImpl.java | 161 ++++++---
.../communication/GridIoMessageFactory.java | 6 +
.../processors/cache/GridCacheEntryEx.java | 8 +
.../processors/cache/GridCacheMapEntry.java | 9 +-
.../GridCacheReturnCompletableWrapper.java | 101 ++++++
.../cache/GridDeferredAckMessageSender.java | 219 ++++++++++++
.../GridDistributedTxRemoteAdapter.java | 65 +++-
.../distributed/dht/GridDhtTxFinishFuture.java | 12 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 33 +-
.../dht/GridDhtTxFinishResponse.java | 52 ++-
.../dht/GridDhtTxOnePhaseCommitAckRequest.java | 134 +++++++
.../distributed/dht/GridDhtTxPrepareFuture.java | 42 ++-
.../dht/GridDhtTxPrepareRequest.java | 93 +++--
.../cache/distributed/dht/GridDhtTxRemote.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 227 +++---------
...arOptimisticSerializableTxPrepareFuture.java | 4 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 7 +-
.../GridNearPessimisticTxPrepareFuture.java | 4 +-
.../near/GridNearTxFinishFuture.java | 112 +++++-
.../continuous/CacheContinuousQueryHandler.java | 5 -
.../cache/transactions/IgniteTxAdapter.java | 46 ++-
.../cache/transactions/IgniteTxEntry.java | 44 ++-
.../cache/transactions/IgniteTxHandler.java | 163 +++++++--
.../transactions/IgniteTxLocalAdapter.java | 27 +-
.../cache/transactions/IgniteTxManager.java | 154 +++++++-
.../continuous/GridContinuousHandler.java | 8 -
.../continuous/GridContinuousProcessor.java | 33 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 18 +-
.../spi/swapspace/file/FileSwapSpaceSpi.java | 38 +-
.../binary/BinaryObjectToStringSelfTest.java | 92 +++++
.../CacheSwapUnswapGetTestSmallQueueSize.java | 35 ++
...idAbstractCacheInterceptorRebalanceTest.java | 356 +++++++++++++++++++
...heInterceptorAtomicOffheapRebalanceTest.java | 30 ++
...GridCacheInterceptorAtomicRebalanceTest.java | 36 ++
...ceptorTransactionalOffheapRebalanceTest.java | 35 ++
...heInterceptorTransactionalRebalanceTest.java | 36 ++
.../processors/cache/GridCacheTestEntryEx.java | 4 +
.../IgniteCacheInterceptorSelfTestSuite.java | 5 +
.../IgniteCachePutRetryAbstractSelfTest.java | 39 +-
...gniteCachePutRetryTransactionalSelfTest.java | 75 +++-
...ContinuousQueryFailoverAbstractSelfTest.java | 99 ++++++
...eContinuousQueryMultiNodesFilteringTest.java | 161 +++++++++
.../file/GridFileSwapSpaceSpiSelfTest.java | 89 +++++
.../IgniteBinaryObjectsTestSuite.java | 2 +
.../testsuites/IgniteCacheTestSuite4.java | 2 +
.../config/benchmark-client-mode.properties | 2 +
.../config/benchmark-tx-win.properties | 2 +
.../yardstick/config/benchmark-tx.properties | 2 +
.../yardstick/config/benchmark-win.properties | 2 +
modules/yardstick/config/benchmark.properties | 2 +
.../cache/IgniteGetAndPutBenchmark.java | 41 +++
.../cache/IgniteGetAndPutTxBenchmark.java | 70 ++++
.../cache/IgniteInvokeTxBenchmark.java | 40 +++
56 files changed, 2663 insertions(+), 447 deletions(-)
----------------------------------------------------------------------
[2/8] ignite git commit: Client discovery: wait during join if
receive RES_CONTINUE_JOIN, RES_WAIT.
Posted by ak...@apache.org.
Client discovery: wait during join if receive RES_CONTINUE_JOIN, RES_WAIT.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1372ce2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1372ce2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1372ce2
Branch: refs/heads/master
Commit: c1372ce2f0633968036fcfb079718214605c3350
Parents: 780bf23
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 20 11:39:37 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 20 11:39:37 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c1372ce2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index bf7f519..2c85645 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -497,6 +497,8 @@ class ClientImpl extends TcpDiscoveryImpl {
Iterator<InetSocketAddress> it = addrs.iterator();
+ boolean wait = false;
+
while (it.hasNext()) {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
@@ -515,12 +517,17 @@ class ClientImpl extends TcpDiscoveryImpl {
Socket sock = sockAndRes.get1().socket();
+ if (log.isDebugEnabled())
+ log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');
+
switch (sockAndRes.get2()) {
case RES_OK:
return new T2<>(sockAndRes.get1(), sockAndRes.get3());
case RES_CONTINUE_JOIN:
case RES_WAIT:
+ wait = true;
+
U.closeQuiet(sock);
break;
@@ -533,7 +540,16 @@ class ClientImpl extends TcpDiscoveryImpl {
}
}
- if (addrs.isEmpty()) {
+ if (wait) {
+ if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
+ return null;
+
+ if (log.isDebugEnabled())
+ log.debug("Will wait before retry join.");
+
+ Thread.sleep(2000);
+ }
+ else if (addrs.isEmpty()) {
if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
return null;
[7/8] ignite git commit: Merge ignite-1.6.9 into ignite-1.7.2.
Posted by ak...@apache.org.
Merge ignite-1.6.9 into ignite-1.7.2.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/85c47ebf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/85c47ebf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/85c47ebf
Branch: refs/heads/master
Commit: 85c47ebf2ed8773f699c9cce093cc79696514494
Parents: 83ff7c7 4ee52f0
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Sep 21 15:45:19 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Sep 21 15:45:19 2016 +0700
----------------------------------------------------------------------
.../store/jdbc/CacheAbstractJdbcStore.java | 42 ++++---
.../cache/store/jdbc/CacheJdbcPojoStore.java | 5 +-
.../internal/processors/igfs/IgfsContext.java | 35 ++++++
.../processors/igfs/IgfsDataManager.java | 121 ++++++++-----------
.../internal/processors/igfs/IgfsImpl.java | 82 ++++++++++---
.../processors/igfs/IgfsInputStreamImpl.java | 103 +++++++++++-----
.../ignite/spi/discovery/tcp/ClientImpl.java | 18 ++-
.../spi/swapspace/file/FileSwapSpaceSpi.java | 38 +++++-
.../binary/BinaryObjectToStringSelfTest.java | 17 +++
.../CacheSwapUnswapGetTestSmallQueueSize.java | 35 ++++++
.../file/GridFileSwapSpaceSpiSelfTest.java | 89 ++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 2 +
12 files changed, 447 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/85c47ebf/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/85c47ebf/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/85c47ebf/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
[6/8] ignite git commit: IGNITE-3936 Added check for already
processed key types on load cache. Added info message about started/finished
load cache. Improved exceptions messages.
Posted by ak...@apache.org.
IGNITE-3936 Added check for already processed key types on load cache. Added info message about started/finished load cache. Improved exceptions messages.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ee52f0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ee52f0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ee52f0a
Branch: refs/heads/master
Commit: 4ee52f0a50d9cf8bc64a277f2d02600a832d6ca6
Parents: b3ba8b8
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Sep 21 15:37:52 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Sep 21 15:37:52 2016 +0700
----------------------------------------------------------------------
.../store/jdbc/CacheAbstractJdbcStore.java | 42 ++++++++++++--------
.../cache/store/jdbc/CacheJdbcPojoStore.java | 5 ++-
2 files changed, 29 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee52f0a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index a33a1e6..fe8a50b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -827,17 +827,31 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
throw new CacheLoaderException("Provided key type is not found in store or cache configuration " +
"[cache=" + U.maskName(cacheName) + ", key=" + keyType + "]");
- String selQry = args[i + 1].toString();
+ String qry = args[i + 1].toString();
EntryMapping em = entryMapping(cacheName, typeIdForTypeName(kindForName(keyType), keyType));
- futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo)));
+ if (log.isInfoEnabled())
+ log.info("Started load cache using custom query [cache=" + U.maskName(cacheName) +
+ ", keyType=" + keyType + ", query=" + qry + "]");
+
+ futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, qry, clo)));
}
}
else {
- Collection<EntryMapping> entryMappings = mappings.values();
+ Collection<String> processedKeyTypes = new HashSet<>();
+
+ for (EntryMapping em : mappings.values()) {
+ String keyType = em.keyType();
+
+ if (processedKeyTypes.contains(keyType))
+ continue;
+
+ processedKeyTypes.add(keyType);
+
+ if (log.isInfoEnabled())
+ log.info("Started load cache [cache=" + U.maskName(cacheName) + ", keyType=" + keyType + "]");
- for (EntryMapping em : entryMappings) {
if (parallelLoadCacheMinThreshold > 0) {
Connection conn = null;
@@ -853,7 +867,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
if (rs.next()) {
if (log.isDebugEnabled())
log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) +
- ", keyType=" + em.keyType() + " ]");
+ ", keyType=" + keyType + "]");
int keyCnt = em.keyCols.size();
@@ -876,13 +890,13 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, 0)));
-
- continue;
}
+
+ continue;
}
catch (SQLException e) {
- log.warning("Failed to load entries from db in multithreaded mode " +
- "[cache=" + U.maskName(cacheName) + ", keyType=" + em.keyType() + " ]", e);
+ log.warning("Failed to load entries from db in multithreaded mode, will try in single thread " +
+ "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType + " ]", e);
}
finally {
U.closeQuiet(conn);
@@ -891,7 +905,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
if (log.isDebugEnabled())
log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) +
- ", keyType=" + em.keyType() + " ]");
+ ", keyType=" + keyType + "]");
futs.add(pool.submit(loadCacheFull(em, clo)));
}
@@ -900,8 +914,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
for (Future<?> fut : futs)
U.get(fut);
- if (log.isDebugEnabled())
- log.debug("Cache loaded from db: " + U.maskName(cacheName));
+ if (log.isInfoEnabled())
+ log.info("Finished load cache: " + U.maskName(cacheName));
}
catch (IgniteCheckedException e) {
throw new CacheLoaderException("Failed to load cache: " + U.maskName(cacheName), e.getCause());
@@ -1941,10 +1955,6 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/** {@inheritDoc} */
@Override public Void call() throws Exception {
- if (log.isDebugEnabled())
- log.debug("Load cache using custom query [cache= " + U.maskName(em.cacheName) +
- ", keyType=" + em.keyType() + ", query=" + qry + "]");
-
Connection conn = null;
PreparedStatement stmt = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee52f0a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index 798b84a..dd3e812 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -102,7 +102,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
return prop.get(obj);
}
catch (Exception e) {
- throw new CacheException("Failed to read object of class: " + typeName, e);
+ throw new CacheException("Failed to read object property [cache=" + U.maskName(cacheName) +
+ ", type=" + typeName + ", prop=" + fldName + "]", e);
}
}
@@ -262,7 +263,7 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
return builder.build();
}
catch (SQLException e) {
- throw new CacheException("Failed to read binary object", e);
+ throw new CacheException("Failed to read binary object: " + typeName, e);
}
}