You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/28 15:03:07 UTC

ignite git commit: IGNITE-1298: Now output stream flush awaits for all data blocks to be processed before updating file length in meta cache.

Repository: ignite
Updated Branches:
  refs/heads/master 16c095a9e -> 02f246535


IGNITE-1298: Now output stream flush awaits for all data blocks to be processed before updating file length in meta cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02f24653
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02f24653
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02f24653

Branch: refs/heads/master
Commit: 02f246535dd31346ff94485810e3bb306bb67cbb
Parents: 16c095a
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri Aug 28 16:03:45 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 16:03:45 2015 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsDataManager.java        | 169 +++++++++----------
 .../processors/igfs/IgfsOutputStreamImpl.java   |   2 +
 .../igfs/IgfsBackupFailoverSelfTest.java        | 137 +++++++++++++--
 3 files changed, 203 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index aa6427d..602924d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.processors.task.*;
-import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -323,58 +322,6 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
-     * Get list of local data blocks of the given file.
-     *
-     * @param fileInfo File info.
-     * @return List of local data block indices.
-     * @throws IgniteCheckedException If failed.
-     */
-    public List<Long> listLocalDataBlocks(IgfsFileInfo fileInfo)
-        throws IgniteCheckedException {
-        assert fileInfo != null;
-
-        int prevGrpIdx = 0; // Block index within affinity group.
-
-        boolean prevPrimaryFlag = false; // Whether previous block was primary.
-
-        List<Long> res = new ArrayList<>();
-
-        for (long i = 0; i < fileInfo.blocksCount(); i++) {
-            // Determine group index.
-            int grpIdx = (int)(i % grpSize);
-
-            if (prevGrpIdx < grpIdx) {
-                // Reuse existing affinity result.
-                if (prevPrimaryFlag)
-                    res.add(i);
-            }
-            else {
-                // Re-calculate affinity result.
-                IgfsBlockKey key = new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(),
-                    fileInfo.evictExclude(), i);
-
-                Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key);
-
-                assert affNodes != null && !affNodes.isEmpty();
-
-                ClusterNode primaryNode = affNodes.iterator().next();
-
-                if (primaryNode.id().equals(igfsCtx.kernalContext().localNodeId())) {
-                    res.add(i);
-
-                    prevPrimaryFlag = true;
-                }
-                else
-                    prevPrimaryFlag = false;
-            }
-
-            prevGrpIdx = grpIdx;
-        }
-
-        return res;
-    }
-
-    /**
      * Get data block for specified file ID and block index.
      *
      * @param fileInfo File info.
@@ -1764,6 +1711,19 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
+     * Allows output stream to await for all current acks.
+     *
+     * @param fileId File ID.
+     * @throws IgniteInterruptedCheckedException In case of interrupt.
+     */
+    void awaitAllAcksReceived(IgniteUuid fileId) throws IgniteInterruptedCheckedException {
+        WriteCompletionFuture fut = pendingWrites.get(fileId);
+
+        if (fut != null)
+            fut.awaitAllAcksReceived();
+    }
+
+    /**
      * Future that is completed when all participating
      */
     private class WriteCompletionFuture extends GridFutureAdapter<Boolean> {
@@ -1771,10 +1731,16 @@ public class IgfsDataManager extends IgfsManager {
         private static final long serialVersionUID = 0L;
 
         /** File id to remove future from map. */
-        private IgniteUuid fileId;
+        private final IgniteUuid fileId;
 
         /** Pending acks. */
-        private ConcurrentMap<UUID, Set<Long>> pendingAcks = new ConcurrentHashMap8<>();
+        private final ConcurrentMap<Long, UUID> ackMap = new ConcurrentHashMap8<>();
+
+        /** Lock for map-related conditions. */
+        private final Lock lock = new ReentrantLock();
+
+        /** Condition to wait for empty map. */
+        private final Condition allAcksRcvCond = lock.newCondition();
 
         /** Flag indicating future is waiting for last ack. */
         private volatile boolean awaitingLast;
@@ -1788,6 +1754,23 @@ public class IgfsDataManager extends IgfsManager {
             this.fileId = fileId;
         }
 
+        /**
+         * Await all pending data blockes to be acked.
+         *
+         * @throws IgniteInterruptedCheckedException In case of interrupt.
+         */
+        public void awaitAllAcksReceived() throws IgniteInterruptedCheckedException {
+            lock.lock();
+
+            try {
+                while (!ackMap.isEmpty())
+                    U.await(allAcksRcvCond);
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
         /** {@inheritDoc} */
         @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
             if (!isDone()) {
@@ -1808,26 +1791,41 @@ public class IgfsDataManager extends IgfsManager {
          */
         private void onWriteRequest(UUID nodeId, long batchId) {
             if (!isDone()) {
-                Set<Long> reqIds = pendingAcks.get(nodeId);
+                UUID pushedOut = ackMap.putIfAbsent(batchId, nodeId);
 
-                if (reqIds == null)
-                    reqIds = F.addIfAbsent(pendingAcks, nodeId, new GridConcurrentHashSet<Long>());
-
-                reqIds.add(batchId);
+                assert pushedOut == null;
             }
         }
 
         /**
+         * Answers if there are some batches for the specified node we're currently waiting acks for.
+         *
+         * @param nodeId The node Id.
+         * @return If there are acks awaited from this node.
+         */
+        private boolean hasPendingAcks(UUID nodeId) {
+            assert nodeId != null;
+
+            for (Map.Entry<Long, UUID> e : ackMap.entrySet())
+                if (nodeId.equals(e.getValue()))
+                    return true;
+
+            return false;
+        }
+
+        /**
          * Error occurred on node with given ID.
          *
          * @param nodeId Node ID.
          * @param e Caught exception.
          */
         private void onError(UUID nodeId, IgniteCheckedException e) {
-            Set<Long> reqIds = pendingAcks.get(nodeId);
-
             // If waiting for ack from this node.
-            if (reqIds != null && !reqIds.isEmpty()) {
+            if (hasPendingAcks(nodeId)) {
+                ackMap.clear();
+
+                signalNoAcks();
+
                 if (e.hasCause(IgfsOutOfSpaceException.class))
                     onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + nodeId, e));
                 else
@@ -1844,18 +1842,31 @@ public class IgfsDataManager extends IgfsManager {
          */
         private void onWriteAck(UUID nodeId, long batchId) {
             if (!isDone()) {
-                Set<Long> reqIds = pendingAcks.get(nodeId);
+                boolean rmv = ackMap.remove(batchId, nodeId);
 
-                assert reqIds != null : "Received acknowledgement message for not registered node [nodeId=" +
+                assert rmv : "Received acknowledgement message for not registered batch [nodeId=" +
                     nodeId + ", batchId=" + batchId + ']';
 
-                boolean rmv = reqIds.remove(batchId);
+                if (ackMap.isEmpty()) {
+                    signalNoAcks();
 
-                assert rmv : "Received acknowledgement message for not registered batch [nodeId=" +
-                    nodeId + ", batchId=" + batchId + ']';
+                    if (awaitingLast)
+                        onDone(true);
+                }
+            }
+        }
+
+        /**
+         * Signal that currenlty there are no more pending acks.
+         */
+        private void signalNoAcks() {
+            lock.lock();
 
-                if (awaitingLast && checkCompleted())
-                    onDone(true);
+            try {
+                allAcksRcvCond.signalAll();
+            }
+            finally {
+                lock.unlock();
             }
         }
 
@@ -1868,24 +1879,8 @@ public class IgfsDataManager extends IgfsManager {
             if (log.isDebugEnabled())
                 log.debug("Marked write completion future as awaiting last ack: " + fileId);
 
-            if (checkCompleted())
+            if (ackMap.isEmpty())
                 onDone(true);
         }
-
-        /**
-         * @return True if received all request acknowledgements after {@link #markWaitingLastAck()} was called.
-         */
-        private boolean checkCompleted() {
-            for (Map.Entry<UUID, Set<Long>> entry : pendingAcks.entrySet()) {
-                Set<Long> reqIds = entry.getValue();
-
-                // If still waiting for some acks.
-                if (!reqIds.isEmpty())
-                    return false;
-            }
-
-            // Got match for each entry in sent map.
-            return true;
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 298733a..01359b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -283,6 +283,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
             }
 
             if (space > 0) {
+                data.awaitAllAcksReceived(fileInfo.id());
+
                 IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(),
                     new ReserveSpaceClosure(space, streamRange));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
index 0162121..09cecaa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.igfs.secondary.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.testframework.*;
@@ -108,21 +109,6 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
-     * Creates IPC configuration.
-     *
-     * @param port The port to use.
-     * @return The endpoint configuration.
-     */
-    protected IgfsIpcEndpointConfiguration createIgfsRestConfig(int port) {
-        IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration();
-
-        cfg.setType(IgfsIpcEndpointType.TCP);
-        cfg.setPort(port);
-
-        return cfg;
-    }
-
-    /**
      * Start grid with IGFS.
      *
      * @param gridName Grid name.
@@ -296,8 +282,7 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
         final AtomicBoolean stop = new AtomicBoolean();
 
         GridTestUtils.runMultiThreadedAsync(new Callable() {
-            @Override
-            public Object call() throws Exception {
+            @Override public Object call() throws Exception {
                 Thread.sleep(1_000); // Some delay to ensure read is in progress.
 
                 // Now stop all the nodes but the 1st:
@@ -390,7 +375,7 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
 
             final int f = f0;
 
-            int att = doWithRetries(2, new Callable<Void>() {
+            int att = doWithRetries(1, new Callable<Void>() {
                 @Override public Void call() throws Exception {
                     IgfsOutputStream ios = os;
 
@@ -411,6 +396,8 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
                 }
             });
 
+            assert att == 1;
+
             X.println("write #2 completed: " + f0 + " in " + att + " attempts.");
         }
 
@@ -432,6 +419,120 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
+     *
+     * @throws Exception
+     */
+    public void testWriteFailoverWhileStoppingMultipleNodes() throws Exception {
+        final IgfsImpl igfs0 = nodeDatas[0].igfsImpl;
+
+        clear(igfs0);
+
+        IgfsAbstractSelfTest.create(igfs0, paths(DIR, SUBDIR), null);
+
+        final IgfsOutputStream[] outStreams = new IgfsOutputStream[files];
+
+        // Create files:
+        for (int f = 0; f < files; f++) {
+            final byte[] data = createChunk(fileSize, f);
+
+            IgfsOutputStream os = null;
+
+            try {
+                os = igfs0.create(filePath(f), 256, true, null, 0, -1, null);
+
+                assert os != null;
+
+                writeFileChunks(os, data);
+            }
+            finally {
+                if (os != null)
+                    os.flush();
+            }
+
+            outStreams[f] = os;
+
+            X.println("write #1 completed: " + f);
+        }
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        GridTestUtils.runMultiThreadedAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                Thread.sleep(10_000); // Some delay to ensure read is in progress.
+
+                // Now stop all the nodes but the 1st:
+                for (int n = 1; n < numIgfsNodes; n++) {
+                    stopGrid(n);
+
+                    X.println("#### grid " + n + " stopped.");
+                }
+
+                //Thread.sleep(10_000);
+
+                stop.set(true);
+
+                return null;
+            }
+        }, 1, "igfs-node-stopper");
+
+        // Write #2:
+        for (int f0 = 0; f0 < files; f0++) {
+            final IgfsOutputStream os = outStreams[f0];
+
+            assert os != null;
+
+            final int f = f0;
+
+            int att = doWithRetries(1, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgfsOutputStream ios = os;
+
+                    try {
+                        writeChunks0(igfs0, ios, f);
+                    }
+                    catch (IOException ioe) {
+                        log().warning("Attempt to append the data to existing stream failed: ", ioe);
+
+                        ios = igfs0.append(filePath(f), false);
+
+                        assert ios != null;
+
+                        writeChunks0(igfs0, ios, f);
+                    }
+
+                    return null;
+                }
+            });
+
+            assert att == 1;
+
+            X.println("write #2 completed: " + f0 + " in " + att + " attempts.");
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return stop.get();
+            }
+        }, 25_000);
+
+        // Check files:
+        for (int f = 0; f < files; f++) {
+            IgfsPath path = filePath(f);
+
+            byte[] data = createChunk(fileSize, f);
+
+            // Check through 1st node:
+            checkExist(igfs0, path);
+
+            assertEquals("File length mismatch.", data.length * 2, igfs0.size(path));
+
+            checkFileContent(igfs0, path, data, data);
+
+            X.println("Read test completed: " + f);
+        }
+    }
+
+    /**
      * Writes data to the file of the specified index and closes the output stream.
      *
      * @param igfs0 IGFS.