You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/28 15:03:07 UTC
ignite git commit: IGNITE-1298: Now output stream flush awaits for
all data blocks to be processed before updating file length in meta cache.
Repository: ignite
Updated Branches:
refs/heads/master 16c095a9e -> 02f246535
IGNITE-1298: Now output stream flush awaits for all data blocks to be processed before updating file length in meta cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02f24653
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02f24653
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02f24653
Branch: refs/heads/master
Commit: 02f246535dd31346ff94485810e3bb306bb67cbb
Parents: 16c095a
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri Aug 28 16:03:45 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 16:03:45 2015 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsDataManager.java | 169 +++++++++----------
.../processors/igfs/IgfsOutputStreamImpl.java | 2 +
.../igfs/IgfsBackupFailoverSelfTest.java | 137 +++++++++++++--
3 files changed, 203 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index aa6427d..602924d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.datastreamer.*;
import org.apache.ignite.internal.processors.task.*;
-import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -323,58 +322,6 @@ public class IgfsDataManager extends IgfsManager {
}
/**
- * Get list of local data blocks of the given file.
- *
- * @param fileInfo File info.
- * @return List of local data block indices.
- * @throws IgniteCheckedException If failed.
- */
- public List<Long> listLocalDataBlocks(IgfsFileInfo fileInfo)
- throws IgniteCheckedException {
- assert fileInfo != null;
-
- int prevGrpIdx = 0; // Block index within affinity group.
-
- boolean prevPrimaryFlag = false; // Whether previous block was primary.
-
- List<Long> res = new ArrayList<>();
-
- for (long i = 0; i < fileInfo.blocksCount(); i++) {
- // Determine group index.
- int grpIdx = (int)(i % grpSize);
-
- if (prevGrpIdx < grpIdx) {
- // Reuse existing affinity result.
- if (prevPrimaryFlag)
- res.add(i);
- }
- else {
- // Re-calculate affinity result.
- IgfsBlockKey key = new IgfsBlockKey(fileInfo.id(), fileInfo.affinityKey(),
- fileInfo.evictExclude(), i);
-
- Collection<ClusterNode> affNodes = dataCache.affinity().mapKeyToPrimaryAndBackups(key);
-
- assert affNodes != null && !affNodes.isEmpty();
-
- ClusterNode primaryNode = affNodes.iterator().next();
-
- if (primaryNode.id().equals(igfsCtx.kernalContext().localNodeId())) {
- res.add(i);
-
- prevPrimaryFlag = true;
- }
- else
- prevPrimaryFlag = false;
- }
-
- prevGrpIdx = grpIdx;
- }
-
- return res;
- }
-
- /**
* Get data block for specified file ID and block index.
*
* @param fileInfo File info.
@@ -1764,6 +1711,19 @@ public class IgfsDataManager extends IgfsManager {
}
/**
+ * Allows output stream to await for all current acks.
+ *
+ * @param fileId File ID.
+ * @throws IgniteInterruptedCheckedException In case of interrupt.
+ */
+ void awaitAllAcksReceived(IgniteUuid fileId) throws IgniteInterruptedCheckedException {
+ WriteCompletionFuture fut = pendingWrites.get(fileId);
+
+ if (fut != null)
+ fut.awaitAllAcksReceived();
+ }
+
+ /**
* Future that is completed when all participating
*/
private class WriteCompletionFuture extends GridFutureAdapter<Boolean> {
@@ -1771,10 +1731,16 @@ public class IgfsDataManager extends IgfsManager {
private static final long serialVersionUID = 0L;
/** File id to remove future from map. */
- private IgniteUuid fileId;
+ private final IgniteUuid fileId;
/** Pending acks. */
- private ConcurrentMap<UUID, Set<Long>> pendingAcks = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Long, UUID> ackMap = new ConcurrentHashMap8<>();
+
+ /** Lock for map-related conditions. */
+ private final Lock lock = new ReentrantLock();
+
+ /** Condition to wait for empty map. */
+ private final Condition allAcksRcvCond = lock.newCondition();
/** Flag indicating future is waiting for last ack. */
private volatile boolean awaitingLast;
@@ -1788,6 +1754,23 @@ public class IgfsDataManager extends IgfsManager {
this.fileId = fileId;
}
+ /**
+ * Await all pending data blockes to be acked.
+ *
+ * @throws IgniteInterruptedCheckedException In case of interrupt.
+ */
+ public void awaitAllAcksReceived() throws IgniteInterruptedCheckedException {
+ lock.lock();
+
+ try {
+ while (!ackMap.isEmpty())
+ U.await(allAcksRcvCond);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
if (!isDone()) {
@@ -1808,26 +1791,41 @@ public class IgfsDataManager extends IgfsManager {
*/
private void onWriteRequest(UUID nodeId, long batchId) {
if (!isDone()) {
- Set<Long> reqIds = pendingAcks.get(nodeId);
+ UUID pushedOut = ackMap.putIfAbsent(batchId, nodeId);
- if (reqIds == null)
- reqIds = F.addIfAbsent(pendingAcks, nodeId, new GridConcurrentHashSet<Long>());
-
- reqIds.add(batchId);
+ assert pushedOut == null;
}
}
/**
+ * Answers if there are some batches for the specified node we're currently waiting acks for.
+ *
+ * @param nodeId The node Id.
+ * @return If there are acks awaited from this node.
+ */
+ private boolean hasPendingAcks(UUID nodeId) {
+ assert nodeId != null;
+
+ for (Map.Entry<Long, UUID> e : ackMap.entrySet())
+ if (nodeId.equals(e.getValue()))
+ return true;
+
+ return false;
+ }
+
+ /**
* Error occurred on node with given ID.
*
* @param nodeId Node ID.
* @param e Caught exception.
*/
private void onError(UUID nodeId, IgniteCheckedException e) {
- Set<Long> reqIds = pendingAcks.get(nodeId);
-
// If waiting for ack from this node.
- if (reqIds != null && !reqIds.isEmpty()) {
+ if (hasPendingAcks(nodeId)) {
+ ackMap.clear();
+
+ signalNoAcks();
+
if (e.hasCause(IgfsOutOfSpaceException.class))
onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " + nodeId, e));
else
@@ -1844,18 +1842,31 @@ public class IgfsDataManager extends IgfsManager {
*/
private void onWriteAck(UUID nodeId, long batchId) {
if (!isDone()) {
- Set<Long> reqIds = pendingAcks.get(nodeId);
+ boolean rmv = ackMap.remove(batchId, nodeId);
- assert reqIds != null : "Received acknowledgement message for not registered node [nodeId=" +
+ assert rmv : "Received acknowledgement message for not registered batch [nodeId=" +
nodeId + ", batchId=" + batchId + ']';
- boolean rmv = reqIds.remove(batchId);
+ if (ackMap.isEmpty()) {
+ signalNoAcks();
- assert rmv : "Received acknowledgement message for not registered batch [nodeId=" +
- nodeId + ", batchId=" + batchId + ']';
+ if (awaitingLast)
+ onDone(true);
+ }
+ }
+ }
+
+ /**
+ * Signal that currenlty there are no more pending acks.
+ */
+ private void signalNoAcks() {
+ lock.lock();
- if (awaitingLast && checkCompleted())
- onDone(true);
+ try {
+ allAcksRcvCond.signalAll();
+ }
+ finally {
+ lock.unlock();
}
}
@@ -1868,24 +1879,8 @@ public class IgfsDataManager extends IgfsManager {
if (log.isDebugEnabled())
log.debug("Marked write completion future as awaiting last ack: " + fileId);
- if (checkCompleted())
+ if (ackMap.isEmpty())
onDone(true);
}
-
- /**
- * @return True if received all request acknowledgements after {@link #markWaitingLastAck()} was called.
- */
- private boolean checkCompleted() {
- for (Map.Entry<UUID, Set<Long>> entry : pendingAcks.entrySet()) {
- Set<Long> reqIds = entry.getValue();
-
- // If still waiting for some acks.
- if (!reqIds.isEmpty())
- return false;
- }
-
- // Got match for each entry in sent map.
- return true;
- }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 298733a..01359b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -283,6 +283,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
}
if (space > 0) {
+ data.awaitAllAcksReceived(fileInfo.id());
+
IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(),
new ReserveSpaceClosure(space, streamRange));
http://git-wip-us.apache.org/repos/asf/ignite/blob/02f24653/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
index 0162121..09cecaa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBackupFailoverSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.igfs.secondary.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.testframework.*;
@@ -108,21 +109,6 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
}
/**
- * Creates IPC configuration.
- *
- * @param port The port to use.
- * @return The endpoint configuration.
- */
- protected IgfsIpcEndpointConfiguration createIgfsRestConfig(int port) {
- IgfsIpcEndpointConfiguration cfg = new IgfsIpcEndpointConfiguration();
-
- cfg.setType(IgfsIpcEndpointType.TCP);
- cfg.setPort(port);
-
- return cfg;
- }
-
- /**
* Start grid with IGFS.
*
* @param gridName Grid name.
@@ -296,8 +282,7 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
final AtomicBoolean stop = new AtomicBoolean();
GridTestUtils.runMultiThreadedAsync(new Callable() {
- @Override
- public Object call() throws Exception {
+ @Override public Object call() throws Exception {
Thread.sleep(1_000); // Some delay to ensure read is in progress.
// Now stop all the nodes but the 1st:
@@ -390,7 +375,7 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
final int f = f0;
- int att = doWithRetries(2, new Callable<Void>() {
+ int att = doWithRetries(1, new Callable<Void>() {
@Override public Void call() throws Exception {
IgfsOutputStream ios = os;
@@ -411,6 +396,8 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
}
});
+ assert att == 1;
+
X.println("write #2 completed: " + f0 + " in " + att + " attempts.");
}
@@ -432,6 +419,120 @@ public class IgfsBackupFailoverSelfTest extends IgfsCommonAbstractTest {
}
/**
+ *
+ * @throws Exception
+ */
+ public void testWriteFailoverWhileStoppingMultipleNodes() throws Exception {
+ final IgfsImpl igfs0 = nodeDatas[0].igfsImpl;
+
+ clear(igfs0);
+
+ IgfsAbstractSelfTest.create(igfs0, paths(DIR, SUBDIR), null);
+
+ final IgfsOutputStream[] outStreams = new IgfsOutputStream[files];
+
+ // Create files:
+ for (int f = 0; f < files; f++) {
+ final byte[] data = createChunk(fileSize, f);
+
+ IgfsOutputStream os = null;
+
+ try {
+ os = igfs0.create(filePath(f), 256, true, null, 0, -1, null);
+
+ assert os != null;
+
+ writeFileChunks(os, data);
+ }
+ finally {
+ if (os != null)
+ os.flush();
+ }
+
+ outStreams[f] = os;
+
+ X.println("write #1 completed: " + f);
+ }
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ GridTestUtils.runMultiThreadedAsync(new Callable() {
+ @Override public Object call() throws Exception {
+ Thread.sleep(10_000); // Some delay to ensure read is in progress.
+
+ // Now stop all the nodes but the 1st:
+ for (int n = 1; n < numIgfsNodes; n++) {
+ stopGrid(n);
+
+ X.println("#### grid " + n + " stopped.");
+ }
+
+ //Thread.sleep(10_000);
+
+ stop.set(true);
+
+ return null;
+ }
+ }, 1, "igfs-node-stopper");
+
+ // Write #2:
+ for (int f0 = 0; f0 < files; f0++) {
+ final IgfsOutputStream os = outStreams[f0];
+
+ assert os != null;
+
+ final int f = f0;
+
+ int att = doWithRetries(1, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ IgfsOutputStream ios = os;
+
+ try {
+ writeChunks0(igfs0, ios, f);
+ }
+ catch (IOException ioe) {
+ log().warning("Attempt to append the data to existing stream failed: ", ioe);
+
+ ios = igfs0.append(filePath(f), false);
+
+ assert ios != null;
+
+ writeChunks0(igfs0, ios, f);
+ }
+
+ return null;
+ }
+ });
+
+ assert att == 1;
+
+ X.println("write #2 completed: " + f0 + " in " + att + " attempts.");
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return stop.get();
+ }
+ }, 25_000);
+
+ // Check files:
+ for (int f = 0; f < files; f++) {
+ IgfsPath path = filePath(f);
+
+ byte[] data = createChunk(fileSize, f);
+
+ // Check through 1st node:
+ checkExist(igfs0, path);
+
+ assertEquals("File length mismatch.", data.length * 2, igfs0.size(path));
+
+ checkFileContent(igfs0, path, data, data);
+
+ X.println("Read test completed: " + f);
+ }
+ }
+
+ /**
* Writes data to the file of the specified index and closes the output stream.
*
* @param igfs0 IGFS.