You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/21 08:58:24 UTC

[1/8] ignite git commit: ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored

Repository: ignite
Updated Branches:
  refs/heads/master bcbe8cc44 -> 86d31537f


ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780bf23d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780bf23d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780bf23d

Branch: refs/heads/master
Commit: 780bf23d5c89452dd062be4fab9e2e56d50bb9e2
Parents: 9b72d18
Author: sboikov <sb...@gridgain.com>
Authored: Mon Sep 19 18:19:33 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Sep 19 18:19:33 2016 +0300

----------------------------------------------------------------------
 .../spi/swapspace/file/FileSwapSpaceSpi.java    | 38 +++++++--
 .../CacheSwapUnswapGetTestSmallQueueSize.java   | 35 ++++++++
 .../file/GridFileSwapSpaceSpiSelfTest.java      | 89 ++++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |  2 +
 4 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
index 8809f08..9be5b93 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java
@@ -639,7 +639,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
         if (space == null && create) {
             validateName(name);
 
-            Space old = spaces.putIfAbsent(masked, space = new Space(masked));
+            Space old = spaces.putIfAbsent(masked, space = new Space(masked, log));
 
             if (old != null)
                 space = old;
@@ -833,13 +833,21 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
         /** */
         private final int maxSize;
 
+        /** */
+        private final IgniteLogger log;
+
+        /** */
+        private boolean queueSizeWarn;
+
         /**
          * @param minTakeSize Min size.
          * @param maxSize Max size.
+         * @param log logger
          */
-        private SwapValuesQueue(int minTakeSize, int maxSize) {
+        private SwapValuesQueue(int minTakeSize, int maxSize, IgniteLogger log) {
             this.minTakeSize = minTakeSize;
             this.maxSize = maxSize;
+            this.log = log;
         }
 
         /**
@@ -852,8 +860,24 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
             lock.lock();
 
             try {
-                while (size + val.len > maxSize)
-                    mayAdd.await();
+                boolean largeVal = val.len > maxSize;
+
+                if (largeVal) {
+                    if (!queueSizeWarn) {
+                        U.warn(log, "Trying to save in swap entry which have size more than write queue size. " +
+                            "You may wish to increase 'maxWriteQueueSize' in FileSwapSpaceSpi configuration " +
+                            "[queueMaxSize=" + maxSize + ", valSize=" + val.len + ']');
+
+                        queueSizeWarn = true;
+                    }
+
+                    while (size >= minTakeSize)
+                        mayAdd.await();
+                }
+                else {
+                    while (size + val.len > maxSize)
+                        mayAdd.await();
+                }
 
                 size += val.len;
 
@@ -1419,7 +1443,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
         private SwapFile right;
 
         /** */
-        private final SwapValuesQueue que = new SwapValuesQueue(writeBufSize, maxWriteQueSize);
+        private final SwapValuesQueue que;
 
         /** Partitions. */
         private final ConcurrentMap<Integer, ConcurrentMap<SwapKey, SwapValue>> parts =
@@ -1442,11 +1466,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi,
 
         /**
          * @param name Space name.
+         * @param log Logger.
          */
-        private Space(String name) {
+        private Space(String name, IgniteLogger log) {
             assert name != null;
 
             this.name = name;
+            this.que = new SwapValuesQueue(writeBufSize, maxWriteQueSize, log);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
new file mode 100644
index 0000000..8d189fe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+
+/**
+ *
+ */
+public class CacheSwapUnswapGetTestSmallQueueSize extends CacheSwapUnswapGetTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((FileSwapSpaceSpi)cfg.getSwapSpaceSpi()).setMaxWriteQueueSize(2);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
index 64652b1..ab21165 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java
@@ -25,11 +25,14 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -37,8 +40,10 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 import org.apache.ignite.spi.swapspace.GridSwapSpaceSpiAbstractSelfTest;
 import org.apache.ignite.spi.swapspace.SwapKey;
 import org.apache.ignite.spi.swapspace.SwapSpaceSpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.junit.Assert;
 
 /**
  * Test for {@link FileSwapSpaceSpi}.
@@ -364,4 +369,88 @@ public class GridFileSwapSpaceSpiSelfTest extends GridSwapSpaceSpiAbstractSelfTe
 
         assertEquals(hash0, hash1);
     }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSaveValueLargeThenQueueSize() throws IgniteCheckedException {
+        final String spaceName = "mySpace";
+        final SwapKey key = new SwapKey("key");
+
+        final byte[] val = new byte[FileSwapSpaceSpi.DFLT_QUE_SIZE * 2];
+        Arrays.fill(val, (byte)1);
+
+        IgniteInternalFuture<byte[]> fut = GridTestUtils.runAsync(new Callable<byte[]>() {
+            @Override public byte[] call() throws Exception {
+                return saveAndGet(spaceName, key, val);
+            }
+        });
+
+        byte[] bytes = fut.get(10_000);
+
+        Assert.assertArrayEquals(val, bytes);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSaveValueLargeThenQueueSizeMultiThreaded() throws Exception {
+        final String spaceName = "mySpace";
+
+        final int threads = 5;
+
+        long DURATION = 30_000;
+
+        final int maxSize = FileSwapSpaceSpi.DFLT_QUE_SIZE * 2;
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        try {
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (!done.get()) {
+                        SwapKey key = new SwapKey(rnd.nextInt(1000));
+
+                        spi.store(spaceName, key, new byte[rnd.nextInt(0, maxSize)], context());
+                    }
+
+                    return null;
+                }
+            }, threads, " async-put");
+
+            Thread.sleep(DURATION);
+
+            done.set(true);
+
+            fut.get();
+        }
+        finally {
+           done.set(true);
+        }
+    }
+
+    /**
+     * @param spaceName Space name.
+     * @param key Key.
+     * @param val Value.
+     * @throws Exception If failed.
+     * @return Read bytes.
+     */
+    private byte[] saveAndGet(final String spaceName, final SwapKey key, byte[] val) throws Exception {
+        spi.store(spaceName, key, val, context());
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return spi.read(spaceName, key, context()) != null;
+            }
+        }, 10_000);
+
+        byte[] res = spi.read(spaceName, key, context());
+
+        assertNotNull(res);
+
+        return res;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 60d59d7..c494e73 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam
 import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest;
 import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest;
 import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest;
+import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTestSmallQueueSize;
 import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest;
 import org.apache.ignite.internal.processors.cache.CrossCacheLockTest;
 import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest;
@@ -304,6 +305,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.class);
 
         suite.addTestSuite(CacheSwapUnswapGetTest.class);
+        suite.addTestSuite(CacheSwapUnswapGetTestSmallQueueSize.class);
 
         suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class);
         suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class);


[8/8] ignite git commit: Merge branch ignite-1.7.2 into master.

Posted by ak...@apache.org.
Merge branch ignite-1.7.2 into master.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86d31537
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86d31537
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86d31537

Branch: refs/heads/master
Commit: 86d31537f8f9485bbc197f197b6ddd1cada46217
Parents: bcbe8cc 85c47eb
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Sep 21 15:59:04 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Sep 21 15:59:04 2016 +0700

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      |  42 ++++---
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |   5 +-
 .../internal/processors/igfs/IgfsContext.java   |  35 ++++++
 .../processors/igfs/IgfsDataManager.java        | 121 ++++++++-----------
 .../internal/processors/igfs/IgfsImpl.java      |  82 ++++++++++---
 .../processors/igfs/IgfsInputStreamImpl.java    | 103 +++++++++++-----
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  18 ++-
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |  38 +++++-
 .../binary/BinaryObjectToStringSelfTest.java    |  17 +++
 .../CacheSwapUnswapGetTestSmallQueueSize.java   |  35 ++++++
 .../file/GridFileSwapSpaceSpiSelfTest.java      |  89 ++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 12 files changed, 447 insertions(+), 140 deletions(-)
----------------------------------------------------------------------



[3/8] ignite git commit: IGNITE-3859: IGFS: Support direct PROXY mode invocation in the open method, add proxy mode to IgfsInputStreamImpl This closes #1065. This closes #1083.

Posted by ak...@apache.org.
IGNITE-3859:  IGFS:  Support direct PROXY mode invocation in the open method, add proxy mode to IgfsInputStreamImpl
This closes #1065. This closes #1083.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a35ee9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a35ee9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a35ee9d

Branch: refs/heads/master
Commit: 5a35ee9dad194b3009151b79f0ebd3976bb8fd22
Parents: 2474e2b
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Tue Sep 20 14:10:55 2016 +0500
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Tue Sep 20 14:10:55 2016 +0500

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsContext.java   |  35 ++++++
 .../processors/igfs/IgfsDataManager.java        | 121 ++++++++-----------
 .../internal/processors/igfs/IgfsImpl.java      |  82 ++++++++++---
 .../processors/igfs/IgfsInputStreamImpl.java    | 103 +++++++++++-----
 4 files changed, 226 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 3e01246..3405b53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.igfs;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
@@ -60,6 +63,12 @@ public class IgfsContext {
     /** Local cluster node. */
     private volatile ClusterNode locNode;
 
+    /** IGFS executor service. */
+    private ExecutorService igfsSvc;
+
+    /** Logger. */
+    protected IgniteLogger log;
+
     /**
      * @param ctx Kernal context.
      * @param cfg IGFS configuration.
@@ -85,6 +94,10 @@ public class IgfsContext {
         this.srvMgr = add(srvMgr);
         this.fragmentizerMgr = add(fragmentizerMgr);
 
+        log = ctx.log(IgfsContext.class);
+
+        igfsSvc = ctx.getIgfsExecutorService();
+
         igfs = new IgfsImpl(this);
     }
 
@@ -206,6 +219,28 @@ public class IgfsContext {
     }
 
     /**
+     * Executes runnable in IGFS executor service. If execution rejected, runnable will be executed
+     * in caller thread.
+     *
+     * @param r Runnable to execute.
+     */
+    public void runInIgfsThreadPool(Runnable r) {
+        try {
+            igfsSvc.submit(r);
+        }
+        catch (RejectedExecutionException ignored) {
+            // This exception will happen if network speed is too low and data comes faster
+            // than we can send it to remote nodes.
+            try {
+                r.run();
+            }
+            catch (Exception e) {
+                log.warning("Failed to execute IGFS runnable: " + r, e);
+            }
+        }
+    }
+
+    /**
      * Adds manager to managers list.
      *
      * @param mgr Manager.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index d2183f9..2f704ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -74,12 +73,9 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -123,9 +119,6 @@ public class IgfsDataManager extends IgfsManager {
     /** Affinity key generator. */
     private AtomicLong affKeyGen = new AtomicLong();
 
-    /** IGFS executor service. */
-    private ExecutorService igfsSvc;
-
     /** Request ID counter for write messages. */
     private AtomicLong reqIdCtr = new AtomicLong();
 
@@ -183,8 +176,6 @@ public class IgfsDataManager extends IgfsManager {
             }
         }, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
-        igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
-
         delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
             "igfs-" + igfsName + "-delete-worker", log);
     }
@@ -345,45 +336,11 @@ public class IgfsDataManager extends IgfsManager {
 
                         if (oldRmtReadFut == null) {
                             try {
-                                if (log.isDebugEnabled())
-                                    log.debug("Reading non-local data block in the secondary file system [path=" +
-                                        path + ", fileInfo=" + fileInfo + ", blockIdx=" + blockIdx + ']');
-
-                                int blockSize = fileInfo.blockSize();
-
-                                long pos = blockIdx * blockSize; // Calculate position for Hadoop
-
-                                res = new byte[blockSize];
-
-                                int read = 0;
-
-                                synchronized (secReader) {
-                                    try {
-                                        // Delegate to the secondary file system.
-                                        while (read < blockSize) {
-                                            int r = secReader.read(pos + read, res, read, blockSize - read);
-
-                                            if (r < 0)
-                                                break;
-
-                                            read += r;
-                                        }
-                                    }
-                                    catch (IOException e) {
-                                        throw new IgniteCheckedException("Failed to read data due to secondary file system " +
-                                            "exception: " + e.getMessage(), e);
-                                    }
-                                }
-
-                                // If we did not read full block at the end of the file - trim it.
-                                if (read != blockSize)
-                                    res = Arrays.copyOf(res, read);
+                                res = secondaryDataBlock(path, blockIdx, secReader, fileInfo.blockSize());
 
                                 rmtReadFut.onDone(res);
 
                                 putBlock(fileInfo.blockSize(), key, res);
-
-                                igfsCtx.metrics().addReadBlocks(1, 1);
                             }
                             catch (IgniteCheckedException e) {
                                 rmtReadFut.onDone(e);
@@ -417,11 +374,59 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
+     * Get data block for specified block index from secondary reader.
+     *
+     * @param path Path reading from.
+     * @param blockIdx Block index.
+     * @param secReader Optional secondary file system reader.
+     * @param blockSize Block size.
+     * @return Requested data block or {@code null} if nothing found.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public byte[] secondaryDataBlock(IgfsPath path, long blockIdx,
+        IgfsSecondaryFileSystemPositionedReadable secReader, int blockSize) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Reading non-local data block in the secondary file system [path=" +
+                path + ", blockIdx=" + blockIdx + ']');
+
+        long pos = blockIdx * blockSize; // Calculate position for Hadoop
+
+        byte[] res = new byte[blockSize];
+
+        int read = 0;
+
+        try {
+            // Delegate to the secondary file system.
+            while (read < blockSize) {
+                int r = secReader.read(pos + read, res, read, blockSize - read);
+
+                if (r < 0)
+                    break;
+
+                read += r;
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to read data due to secondary file system " +
+                "exception: " + e.getMessage(), e);
+        }
+
+        // If we did not read full block at the end of the file - trim it.
+        if (read != blockSize)
+            res = Arrays.copyOf(res, read);
+
+        igfsCtx.metrics().addReadBlocks(1, 1);
+
+        return res;
+    }
+
+    /**
      * Stores the given block in data cache.
      *
      * @param blockSize The size of the block.
      * @param key The data cache key of the block.
      * @param data The new value of the block.
+     * @throws IgniteCheckedException If failed.
      */
     private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException {
         if (data.length < blockSize)
@@ -967,8 +972,8 @@ public class IgfsDataManager extends IgfsManager {
             }
         }
         else {
-            callIgfsLocalSafe(new GridPlainCallable<Object>() {
-                @Override @Nullable public Object call() throws Exception {
+            igfsCtx.runInIgfsThreadPool(new Runnable() {
+                @Override public void run() {
                     storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>() {
                         @Override public void apply(IgniteInternalFuture<?> fut) {
                             try {
@@ -981,8 +986,6 @@ public class IgfsDataManager extends IgfsManager {
                             }
                         }
                     });
-
-                    return null;
                 }
             });
         }
@@ -1070,28 +1073,6 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
-     * Executes callable in IGFS executor service. If execution rejected, callable will be executed
-     * in caller thread.
-     *
-     * @param c Callable to execute.
-     */
-    private <T> void callIgfsLocalSafe(Callable<T> c) {
-        try {
-            igfsSvc.submit(c);
-        }
-        catch (RejectedExecutionException ignored) {
-            // This exception will happen if network speed is too low and data comes faster
-            // than we can send it to remote nodes.
-            try {
-                c.call();
-            }
-            catch (Exception e) {
-                log.warning("Failed to execute IGFS callable: " + c, e);
-            }
-        }
-    }
-
-    /**
      * @param blocks Blocks to write.
      * @return Future that will be completed after put is done.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 45596a3..87a4699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -47,6 +47,7 @@ import org.apache.ignite.igfs.IgfsPathSummary;
 import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
 import org.apache.ignite.igfs.mapreduce.IgfsTask;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -949,34 +950,79 @@ public final class IgfsImpl implements IgfsEx {
 
                 IgfsMode mode = resolveMode(path);
 
-                if (mode != PRIMARY) {
-                    assert IgfsUtils.isDualMode(mode);
+                switch (mode) {
+                    case PRIMARY: {
+                        IgfsEntryInfo info = meta.infoForPath(path);
 
-                    IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0);
+                        if (info == null)
+                            throw new IgfsPathNotFoundException("File not found: " + path);
 
-                    IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, desc.info(),
-                        cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader());
+                        if (!info.isFile())
+                            throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
 
-                    IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+                        // Input stream to read data from grid cache with separate blocks.
+                        IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+                            cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null,
+                            info.length(), info.blockSize(), info.blocksCount(), false);
 
-                    return os;
-                }
+                        IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
 
-                IgfsEntryInfo info = meta.infoForPath(path);
+                        return os;
+                    }
 
-                if (info == null)
-                    throw new IgfsPathNotFoundException("File not found: " + path);
+                    case DUAL_ASYNC:
+                    case DUAL_SYNC: {
+                        assert IgfsUtils.isDualMode(mode);
 
-                if (!info.isFile())
-                    throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
+                        IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0);
+
+                        IgfsEntryInfo info = desc.info();
 
-                // Input stream to read data from grid cache with separate blocks.
-                IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
-                    cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null);
+                        IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+                            cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(),
+                            info.length(), info.blockSize(), info.blocksCount(), false);
+
+                        IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+
+                        return os;
+                    }
 
-                IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+                    case PROXY: {
+                        assert secondaryFs != null;
 
-                return os;
+                        IgfsFile info = info(path);
+
+                        if (info == null)
+                            throw new IgfsPathNotFoundException("File not found: " + path);
+
+                        if (!info.isFile())
+                            throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
+
+                        IgfsSecondaryFileSystemPositionedReadable secReader =
+                            new IgfsLazySecondaryFileSystemPositionedReadable(secondaryFs, path, bufSize);
+
+                        long len = info.length();
+
+                        int blockSize = info.blockSize() > 0 ? info.blockSize() : cfg.getBlockSize();
+
+                        long blockCnt = len / blockSize;
+
+                        if (len % blockSize != 0)
+                            blockCnt++;
+
+                        IgfsInputStream os = new IgfsInputStreamImpl(igfsCtx, path, null,
+                            cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, secReader,
+                            info.length(), blockSize, blockCnt, true);
+
+                        IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+
+                        return os;
+                    }
+
+                    default:
+                        assert false : "Unexpected mode " + mode;
+                        return null;
+                }
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index 2f9f2fc..0d9f2cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -109,21 +110,44 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
     /** Time consumed on reading. */
     private long time;
 
+    /** File Length. */
+    private long len;
+
+    /** Block size to read. */
+    private int blockSize;
+
+    /** Block size to read. */
+    private long blocksCnt;
+
+    /** Proxy mode. */
+    private boolean proxy;
+
     /**
      * Constructs file output stream.
-     *
-     * @param igfsCtx IGFS context.
+     *  @param igfsCtx IGFS context.
      * @param path Path to stored file.
      * @param fileInfo File info to write binary data to.
      * @param prefetchBlocks Number of blocks to prefetch.
      * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
      * @param secReader Optional secondary file system reader.
+     * @param len File length.
+     * @param blockSize Block size.
+     * @param blocksCnt Blocks count.
+     * @param proxy Proxy mode flag.
      */
-    IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks,
-        int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader) {
+    IgfsInputStreamImpl(
+        IgfsContext igfsCtx,
+        IgfsPath path,
+        @Nullable IgfsEntryInfo fileInfo,
+        int prefetchBlocks,
+        int seqReadsBeforePrefetch,
+        @Nullable IgfsSecondaryFileSystemPositionedReadable secReader,
+        long len,
+        int blockSize,
+        long blocksCnt,
+        boolean proxy) {
         assert igfsCtx != null;
         assert path != null;
-        assert fileInfo != null;
 
         this.igfsCtx = igfsCtx;
         this.path = path;
@@ -131,6 +155,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
         this.prefetchBlocks = prefetchBlocks;
         this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
         this.secReader = secReader;
+        this.len = len;
+        this.blockSize = blockSize;
+        this.blocksCnt = blocksCnt;
+        this.proxy = proxy;
 
         log = igfsCtx.kernalContext().log(IgfsInputStream.class);
 
@@ -154,7 +182,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
     /** {@inheritDoc} */
     @Override public long length() {
-        return fileInfo.length();
+        return len;
     }
 
     /** {@inheritDoc} */
@@ -195,7 +223,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
     /** {@inheritDoc} */
     @Override public synchronized int available() throws IOException {
-        long l = fileInfo.length() - pos;
+        long l = len - pos;
 
         if (l < 0)
             return 0;
@@ -240,7 +268,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
     @SuppressWarnings("IfMayBeConditional")
     public synchronized byte[][] readChunks(long pos, int len) throws IOException {
         // Readable bytes in the file, starting from the specified position.
-        long readable = fileInfo.length() - pos;
+        long readable = this.len - pos;
 
         if (readable <= 0)
             return EMPTY_CHUNKS;
@@ -254,8 +282,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
         bytes += len;
 
-        int start = (int)(pos / fileInfo.blockSize());
-        int end = (int)((pos + len - 1) / fileInfo.blockSize());
+        int start = (int)(pos / blockSize);
+        int end = (int)((pos + len - 1) / blockSize);
 
         int chunkCnt = end - start + 1;
 
@@ -264,7 +292,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
         for (int i = 0; i < chunkCnt; i++) {
             byte[] block = blockFragmentizerSafe(start + i);
 
-            int blockOff = (int)(pos % fileInfo.blockSize());
+            int blockOff = (int)(pos % blockSize);
             int blockLen = Math.min(len, block.length - blockOff);
 
             // If whole block can be used as result, do not do array copy.
@@ -366,7 +394,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
             return 0; // Fully read done: read zero bytes correctly.
 
         // Readable bytes in the file, starting from the specified position.
-        long readable = fileInfo.length() - pos;
+        long readable = this.len - pos;
 
         if (readable <= 0)
             return -1; // EOF.
@@ -378,10 +406,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
         assert len > 0;
 
-        byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize());
+        byte[] block = blockFragmentizerSafe(pos / blockSize);
 
         // Skip bytes to expected position.
-        int blockOff = (int)(pos % fileInfo.blockSize());
+        int blockOff = (int)(pos % blockSize);
 
         len = Math.min(len, block.length - blockOff);
 
@@ -412,7 +440,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
                         ", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']');
 
                 // This failure may be caused by file being fragmented.
-                if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
+                if (fileInfo != null && fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
                     IgfsEntryInfo newInfo = igfsCtx.meta().info(fileInfo.id());
 
                     // File was deleted.
@@ -459,7 +487,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
             prevBlockIdx = blockIdx;
 
-            bytesFut = dataBlock(fileInfo, blockIdx);
+            bytesFut = dataBlock(blockIdx);
 
             assert bytesFut != null;
 
@@ -470,10 +498,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
         if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) {
             for (int i = 1; i <= prefetchBlocks; i++) {
                 // Ensure that we do not prefetch over file size.
-                if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length())
+                if (blockSize * (i + blockIdx) >= len)
                     break;
                 else if (locCache.get(blockIdx + i) == null)
-                    addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i));
+                    addLocalCacheFuture(blockIdx + i, dataBlock(blockIdx + i));
             }
         }
 
@@ -483,17 +511,17 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
             throw new IgfsCorruptedFileException("Failed to retrieve file's data block (corrupted file?) " +
                 "[path=" + path + ", blockIdx=" + blockIdx + ']');
 
-        int blockSize = fileInfo.blockSize();
+        int blockSize0 = blockSize;
 
-        if (blockIdx == fileInfo.blocksCount() - 1)
-            blockSize = (int)(fileInfo.length() % blockSize);
+        if (blockIdx == blocksCnt - 1)
+            blockSize0 = (int)(len % blockSize0);
 
         // If part of the file was reserved for writing, but was not actually written.
-        if (bytes.length < blockSize)
+        if (bytes.length < blockSize0)
             throw new IOException("Inconsistent file's data block (incorrectly written?)" +
                 " [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length +
-                ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize() +
-                ", fileLen=" + fileInfo.length() + ']');
+                ", expectedBlockSize=" + blockSize0 + ", fileBlockSize=" + blockSize +
+                ", fileLen=" + len + ']');
 
         return bytes;
     }
@@ -538,14 +566,35 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
     /**
      * Get data block for specified block index.
      *
-     * @param fileInfo File info.
      * @param blockIdx Block index.
      * @return Requested data block or {@code null} if nothing found.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsEntryInfo fileInfo, long blockIdx)
+    @Nullable protected IgniteInternalFuture<byte[]> dataBlock(final long blockIdx)
         throws IgniteCheckedException {
-        return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
+        if (proxy) {
+            assert secReader != null;
+
+            final GridFutureAdapter<byte[]> fut = new GridFutureAdapter<>();
+
+            igfsCtx.runInIgfsThreadPool(new Runnable() {
+                @Override public void run() {
+                    try {
+                        fut.onDone(igfsCtx.data().secondaryDataBlock(path, blockIdx, secReader, blockSize));
+                    }
+                    catch (Throwable e) {
+                        fut.onDone(null, e);
+                    }
+                }
+            });
+
+            return fut;
+        }
+        else {
+            assert fileInfo != null;
+
+            return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
+        }
     }
 
     /** {@inheritDoc} */


[4/8] ignite git commit: Added missing header to BinaryObjectToStringSelfTest.

Posted by ak...@apache.org.
Added missing header to BinaryObjectToStringSelfTest.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/135f0a8a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/135f0a8a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/135f0a8a

Branch: refs/heads/master
Commit: 135f0a8a39fb6895fada18d210260deebfb9426d
Parents: c1372ce
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 10:33:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 10:33:11 2016 +0300

----------------------------------------------------------------------
 .../binary/BinaryObjectToStringSelfTest.java       | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/135f0a8a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
index cc6cf8b..df6bcde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectToStringSelfTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.binary;
 
 import java.util.Arrays;


[5/8] ignite git commit: Merge branch ignite-1.6.8 into ignite-1.6.9.

Posted by ak...@apache.org.
Merge branch ignite-1.6.8 into ignite-1.6.9.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b3ba8b88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b3ba8b88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b3ba8b88

Branch: refs/heads/master
Commit: b3ba8b886bd6e582a860d836da093918dc521745
Parents: 5a35ee9 135f0a8
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Sep 21 15:20:07 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Sep 21 15:20:07 2016 +0700

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  12 +
 .../internal/GridEventConsumeHandler.java       |   5 -
 .../internal/GridMessageListenHandler.java      |   5 -
 .../internal/binary/BinaryObjectExImpl.java     | 161 ++++++---
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheEntryEx.java      |   8 +
 .../processors/cache/GridCacheMapEntry.java     |   9 +-
 .../GridCacheReturnCompletableWrapper.java      | 101 ++++++
 .../cache/GridDeferredAckMessageSender.java     | 219 ++++++++++++
 .../GridDistributedTxRemoteAdapter.java         |  65 +++-
 .../distributed/dht/GridDhtTxFinishFuture.java  |  12 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  33 +-
 .../dht/GridDhtTxFinishResponse.java            |  52 ++-
 .../dht/GridDhtTxOnePhaseCommitAckRequest.java  | 134 +++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java |  42 ++-
 .../dht/GridDhtTxPrepareRequest.java            |  93 +++--
 .../cache/distributed/dht/GridDhtTxRemote.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 227 +++---------
 ...arOptimisticSerializableTxPrepareFuture.java |   4 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   7 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   4 +-
 .../near/GridNearTxFinishFuture.java            | 112 +++++-
 .../continuous/CacheContinuousQueryHandler.java |   5 -
 .../cache/transactions/IgniteTxAdapter.java     |  46 ++-
 .../cache/transactions/IgniteTxEntry.java       |  44 ++-
 .../cache/transactions/IgniteTxHandler.java     | 163 +++++++--
 .../transactions/IgniteTxLocalAdapter.java      |  27 +-
 .../cache/transactions/IgniteTxManager.java     | 154 +++++++-
 .../continuous/GridContinuousHandler.java       |   8 -
 .../continuous/GridContinuousProcessor.java     |  33 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  18 +-
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |  38 +-
 .../binary/BinaryObjectToStringSelfTest.java    |  92 +++++
 .../CacheSwapUnswapGetTestSmallQueueSize.java   |  35 ++
 ...idAbstractCacheInterceptorRebalanceTest.java | 356 +++++++++++++++++++
 ...heInterceptorAtomicOffheapRebalanceTest.java |  30 ++
 ...GridCacheInterceptorAtomicRebalanceTest.java |  36 ++
 ...ceptorTransactionalOffheapRebalanceTest.java |  35 ++
 ...heInterceptorTransactionalRebalanceTest.java |  36 ++
 .../processors/cache/GridCacheTestEntryEx.java  |   4 +
 .../IgniteCacheInterceptorSelfTestSuite.java    |   5 +
 .../IgniteCachePutRetryAbstractSelfTest.java    |  39 +-
 ...gniteCachePutRetryTransactionalSelfTest.java |  75 +++-
 ...ContinuousQueryFailoverAbstractSelfTest.java |  99 ++++++
 ...eContinuousQueryMultiNodesFilteringTest.java | 161 +++++++++
 .../file/GridFileSwapSpaceSpiSelfTest.java      |  89 +++++
 .../IgniteBinaryObjectsTestSuite.java           |   2 +
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 .../config/benchmark-client-mode.properties     |   2 +
 .../config/benchmark-tx-win.properties          |   2 +
 .../yardstick/config/benchmark-tx.properties    |   2 +
 .../yardstick/config/benchmark-win.properties   |   2 +
 modules/yardstick/config/benchmark.properties   |   2 +
 .../cache/IgniteGetAndPutBenchmark.java         |  41 +++
 .../cache/IgniteGetAndPutTxBenchmark.java       |  70 ++++
 .../cache/IgniteInvokeTxBenchmark.java          |  40 +++
 56 files changed, 2663 insertions(+), 447 deletions(-)
----------------------------------------------------------------------



[2/8] ignite git commit: Client discovery: wait during join if receive RES_CONTINUE_JOIN, RES_WAIT.

Posted by ak...@apache.org.
Client discovery: wait during join if receive RES_CONTINUE_JOIN, RES_WAIT.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1372ce2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1372ce2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1372ce2

Branch: refs/heads/master
Commit: c1372ce2f0633968036fcfb079718214605c3350
Parents: 780bf23
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 20 11:39:37 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 20 11:39:37 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java      | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c1372ce2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index bf7f519..2c85645 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -497,6 +497,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             Iterator<InetSocketAddress> it = addrs.iterator();
 
+            boolean wait = false;
+
             while (it.hasNext()) {
                 if (Thread.currentThread().isInterrupted())
                     throw new InterruptedException();
@@ -515,12 +517,17 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 Socket sock = sockAndRes.get1().socket();
 
+                if (log.isDebugEnabled())
+                    log.debug("Received response to join request [addr=" + addr + ", res=" + sockAndRes.get2() + ']');
+
                 switch (sockAndRes.get2()) {
                     case RES_OK:
                         return new T2<>(sockAndRes.get1(), sockAndRes.get3());
 
                     case RES_CONTINUE_JOIN:
                     case RES_WAIT:
+                        wait = true;
+
                         U.closeQuiet(sock);
 
                         break;
@@ -533,7 +540,16 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (addrs.isEmpty()) {
+            if (wait) {
+                if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
+                    return null;
+
+                if (log.isDebugEnabled())
+                    log.debug("Will wait before retry join.");
+
+                Thread.sleep(2000);
+            }
+            else if (addrs.isEmpty()) {
                 if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout)
                     return null;
 


[7/8] ignite git commit: Merge ignite-1.6.9 into ignite-1.7.2.

Posted by ak...@apache.org.
Merge ignite-1.6.9 into ignite-1.7.2.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/85c47ebf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/85c47ebf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/85c47ebf

Branch: refs/heads/master
Commit: 85c47ebf2ed8773f699c9cce093cc79696514494
Parents: 83ff7c7 4ee52f0
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Sep 21 15:45:19 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Sep 21 15:45:19 2016 +0700

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      |  42 ++++---
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |   5 +-
 .../internal/processors/igfs/IgfsContext.java   |  35 ++++++
 .../processors/igfs/IgfsDataManager.java        | 121 ++++++++-----------
 .../internal/processors/igfs/IgfsImpl.java      |  82 ++++++++++---
 .../processors/igfs/IgfsInputStreamImpl.java    | 103 +++++++++++-----
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  18 ++-
 .../spi/swapspace/file/FileSwapSpaceSpi.java    |  38 +++++-
 .../binary/BinaryObjectToStringSelfTest.java    |  17 +++
 .../CacheSwapUnswapGetTestSmallQueueSize.java   |  35 ++++++
 .../file/GridFileSwapSpaceSpiSelfTest.java      |  89 ++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 12 files changed, 447 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/85c47ebf/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/85c47ebf/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/85c47ebf/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------


[6/8] ignite git commit: IGNITE-3936 Added check for already processed key types on load cache. Added info message about started/finished load cache. Improved exceptions messages.

Posted by ak...@apache.org.
IGNITE-3936 Added check for already processed key types on load cache. Added info message about started/finished load cache. Improved exceptions messages.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4ee52f0a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4ee52f0a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4ee52f0a

Branch: refs/heads/master
Commit: 4ee52f0a50d9cf8bc64a277f2d02600a832d6ca6
Parents: b3ba8b8
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Sep 21 15:37:52 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Sep 21 15:37:52 2016 +0700

----------------------------------------------------------------------
 .../store/jdbc/CacheAbstractJdbcStore.java      | 42 ++++++++++++--------
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |  5 ++-
 2 files changed, 29 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee52f0a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index a33a1e6..fe8a50b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -827,17 +827,31 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                         throw new CacheLoaderException("Provided key type is not found in store or cache configuration " +
                             "[cache=" + U.maskName(cacheName) + ", key=" + keyType + "]");
 
-                    String selQry = args[i + 1].toString();
+                    String qry = args[i + 1].toString();
 
                     EntryMapping em = entryMapping(cacheName, typeIdForTypeName(kindForName(keyType), keyType));
 
-                    futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo)));
+                    if (log.isInfoEnabled())
+                        log.info("Started load cache using custom query [cache=" + U.maskName(cacheName) +
+                            ", keyType=" + keyType + ", query=" + qry + "]");
+
+                    futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, qry, clo)));
                 }
             }
             else {
-                Collection<EntryMapping> entryMappings = mappings.values();
+                Collection<String> processedKeyTypes = new HashSet<>();
+
+                for (EntryMapping em : mappings.values()) {
+                    String keyType = em.keyType();
+
+                    if (processedKeyTypes.contains(keyType))
+                        continue;
+
+                    processedKeyTypes.add(keyType);
+
+                    if (log.isInfoEnabled())
+                        log.info("Started load cache [cache=" + U.maskName(cacheName) + ", keyType=" + keyType + "]");
 
-                for (EntryMapping em : entryMappings) {
                     if (parallelLoadCacheMinThreshold > 0) {
                         Connection conn = null;
 
@@ -853,7 +867,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                             if (rs.next()) {
                                 if (log.isDebugEnabled())
                                     log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) +
-                                        ", keyType=" + em.keyType() + " ]");
+                                        ", keyType=" + keyType + "]");
 
                                 int keyCnt = em.keyCols.size();
 
@@ -876,13 +890,13 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                                 }
 
                                 futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, 0)));
-
-                                continue;
                             }
+
+                            continue;
                         }
                         catch (SQLException e) {
-                            log.warning("Failed to load entries from db in multithreaded mode " +
-                                "[cache=" + U.maskName(cacheName) + ", keyType=" + em.keyType() + " ]", e);
+                            log.warning("Failed to load entries from db in multithreaded mode, will try in single thread " +
+                                "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType + " ]", e);
                         }
                         finally {
                             U.closeQuiet(conn);
@@ -891,7 +905,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
 
                     if (log.isDebugEnabled())
                         log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) +
-                            ", keyType=" + em.keyType() + " ]");
+                            ", keyType=" + keyType + "]");
 
                     futs.add(pool.submit(loadCacheFull(em, clo)));
                 }
@@ -900,8 +914,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
             for (Future<?> fut : futs)
                 U.get(fut);
 
-            if (log.isDebugEnabled())
-                log.debug("Cache loaded from db: " + U.maskName(cacheName));
+            if (log.isInfoEnabled())
+                log.info("Finished load cache: " + U.maskName(cacheName));
         }
         catch (IgniteCheckedException e) {
             throw new CacheLoaderException("Failed to load cache: " + U.maskName(cacheName), e.getCause());
@@ -1941,10 +1955,6 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
 
         /** {@inheritDoc} */
         @Override public Void call() throws Exception {
-            if (log.isDebugEnabled())
-                log.debug("Load cache using custom query [cache= " + U.maskName(em.cacheName) +
-                    ", keyType=" + em.keyType() + ", query=" + qry + "]");
-
             Connection conn = null;
 
             PreparedStatement stmt = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4ee52f0a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index 798b84a..dd3e812 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -102,7 +102,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
             return prop.get(obj);
         }
         catch (Exception e) {
-            throw new CacheException("Failed to read object of class: " + typeName, e);
+            throw new CacheException("Failed to read object property [cache=" + U.maskName(cacheName) +
+                ", type=" + typeName + ", prop=" + fldName + "]", e);
         }
     }
 
@@ -262,7 +263,7 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
             return builder.build();
         }
         catch (SQLException e) {
-            throw new CacheException("Failed to read binary object", e);
+            throw new CacheException("Failed to read binary object: " + typeName, e);
         }
     }