You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:13 UTC
[10/14] ignite git commit: IGNITE-3260: IGFS: Delete messages are no
longer passed.
IGNITE-3260: IGFS: Delete messages are no longer passed.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/065d2e70
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/065d2e70
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/065d2e70
Branch: refs/heads/ignite-3264
Commit: 065d2e70c21418437eba5e725eaa8b1ebc3af6da
Parents: 0176af1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 18:12:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 18:12:42 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsAsyncImpl.java | 6 -
.../processors/igfs/IgfsDataManager.java | 61 ++---
.../processors/igfs/IgfsDeleteWorker.java | 42 ----
.../ignite/internal/processors/igfs/IgfsEx.java | 9 -
.../internal/processors/igfs/IgfsImpl.java | 249 +++++--------------
.../internal/processors/igfs/IgfsUtils.java | 2 +-
.../ignite/igfs/IgfsFragmentizerSelfTest.java | 2 -
.../processors/igfs/IgfsSizeSelfTest.java | 133 ----------
.../HadoopDefaultMapReducePlannerSelfTest.java | 6 -
9 files changed, 83 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 8653f90..7530557 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.AsyncSupportAdapter;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -166,11 +165,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- return igfs.awaitDeletesAsync();
- }
-
- /** {@inheritDoc} */
@Nullable @Override public String clientLogDirectory() {
return igfs.clientLogDirectory();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 16fbeb8..57a8c6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -1056,34 +1055,24 @@ public class IgfsDataManager extends IgfsManager {
private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
byte[] data) throws IgniteCheckedException {
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- try {
- igfs.awaitDeletesAsync().get(trashPurgeTimeout);
- }
- catch (IgniteFutureTimeoutCheckedException ignore) {
- // Ignore.
- }
+ final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
- // Additional size check.
- if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
-
- if (completionFut == null) {
- if (log.isDebugEnabled())
- log.debug("Missing completion future for file write request (most likely exception occurred " +
- "which will be thrown upon stream close) [fileId=" + fileId + ']');
+ if (completionFut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Missing completion future for file write request (most likely exception occurred " +
+ "which will be thrown upon stream close) [fileId=" + fileId + ']');
- return;
- }
+ return;
+ }
- IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
- "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
- ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
+ IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
+ "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+ ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
- completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
- igfsCtx.kernalContext().localNodeId(), e));
+ completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
+ igfsCtx.kernalContext().localNodeId(), e));
- return;
- }
+ return;
}
// No affinity key present, just concat and return.
@@ -1225,26 +1214,10 @@ public class IgfsDataManager extends IgfsManager {
assert !blocks.isEmpty();
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- try {
- try {
- igfs.awaitDeletesAsync().get(trashPurgeTimeout);
- }
- catch (IgniteFutureTimeoutCheckedException ignore) {
- // Ignore.
- }
-
- // Additional size check.
- if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax())
- return new GridFinishedFuture<Object>(
- new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
- "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
- ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
-
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(new IgniteCheckedException("Failed to store data " +
- "block due to unexpected exception.", e));
- }
+ return new GridFinishedFuture<Object>(
+ new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
+ "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+ ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
}
return dataCachePrj.putAllAsync(blocks);
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index bae9354..310090d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -19,13 +19,10 @@ package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -37,8 +34,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-
/**
* IGFS worker for removal from the trash directory.
*/
@@ -49,9 +44,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** How many files/folders to delete at once (i.e in a single transaction). */
private static final int MAX_DELETE_BATCH = 100;
- /** IGFS context. */
- private final IgfsContext igfsCtx;
-
/** Metadata manager. */
private final IgfsMetaManager meta;
@@ -73,9 +65,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** Cancellation flag. */
private volatile boolean cancelled;
- /** Message topic. */
- private Object topic;
-
/**
* Constructor.
*
@@ -84,15 +73,9 @@ public class IgfsDeleteWorker extends IgfsThread {
IgfsDeleteWorker(IgfsContext igfsCtx) {
super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%");
- this.igfsCtx = igfsCtx;
-
meta = igfsCtx.meta();
data = igfsCtx.data();
- String igfsName = igfsCtx.igfs().name();
-
- topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
-
assert meta != null;
assert data != null;
@@ -189,8 +172,6 @@ public class IgfsDeleteWorker extends IgfsThread {
if (log.isDebugEnabled())
log.debug("Sending delete confirmation message [name=" + entry.getKey() +
", fileId=" + fileId + ']');
-
- sendDeleteMessage(new IgfsDeleteMessage(fileId));
}
}
else
@@ -201,8 +182,6 @@ public class IgfsDeleteWorker extends IgfsThread {
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e);
-
- sendDeleteMessage(new IgfsDeleteMessage(fileId, e));
}
}
}
@@ -346,25 +325,4 @@ public class IgfsDeleteWorker extends IgfsThread {
return true; // Directory entry was deleted concurrently.
}
}
-
- /**
- * Send delete message to all meta cache nodes in the grid.
- *
- * @param msg Message to send.
- */
- private void sendDeleteMessage(IgfsDeleteMessage msg) {
- assert msg != null;
-
- Collection<ClusterNode> nodes = meta.metaCacheNodes();
-
- for (ClusterNode node : nodes) {
- try {
- igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL);
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() +
- ", msg=" + msg + ", err=" + e.getMessage() + ']');
- }
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index fb67e20..4c64bc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -23,7 +23,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -100,14 +99,6 @@ public interface IgfsEx extends IgniteFileSystem {
public long groupBlockSize();
/**
- * Asynchronously await for all entries existing in trash to be removed.
- *
- * @return Future which will be completed when all entries existed in trash by the time of invocation are removed.
- * @throws IgniteCheckedException If failed.
- */
- public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException;
-
- /**
* Gets client file system log directory.
*
* @return Client file system log directory or {@code null} in case no client connections have been created yet.
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 9087ff0..262dfef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -32,10 +32,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsInvalidPathException;
import org.apache.ignite.igfs.IgfsMetrics;
@@ -51,9 +50,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
@@ -69,8 +66,6 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallab
import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
@@ -100,11 +95,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
@@ -114,14 +109,10 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
/**
* Cache-based IGFS implementation.
@@ -130,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
/** Default permissions for file system entry. */
private static final String PERMISSION_DFLT_VAL = "0777";
+ /** Index generator for async format threads. */
+ private static final AtomicInteger FORMAT_THREAD_IDX_GEN = new AtomicInteger();
+
/** Default directory metadata. */
static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION, PERMISSION_DFLT_VAL);
@@ -169,24 +163,12 @@ public final class IgfsImpl implements IgfsEx {
/** Writers map. */
private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new ConcurrentHashMap8<>();
- /** Delete futures. */
- private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>();
-
- /** Delete message listener. */
- private final GridMessageListener delMsgLsnr = new FormatMessageListener();
-
- /** Format discovery listener. */
- private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener();
-
/** Local metrics holder. */
private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
/** Client log directory. */
private volatile String logDir;
- /** Message topic. */
- private Object topic;
-
/** Eviction policy (if set). */
private IgfsPerBlockLruEvictionPolicy evictPlc;
@@ -292,11 +274,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
- igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
- igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L,
new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
}
@@ -332,9 +309,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
- igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
-
// Restore interrupted flag.
if (interrupted)
Thread.currentThread().interrupt();
@@ -1381,7 +1355,25 @@ public final class IgfsImpl implements IgfsEx {
/** {@inheritDoc} */
@Override public void format() {
try {
- formatAsync().get();
+ IgniteUuid id = meta.format();
+
+ // If ID is null, then file system is already empty.
+ if (id == null)
+ return;
+
+ while (true) {
+ if (enterBusy()) {
+ try {
+ if (!meta.exists(id))
+ return;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ U.sleep(10);
+ }
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
@@ -1394,69 +1386,16 @@ public final class IgfsImpl implements IgfsEx {
* @return Future.
*/
IgniteInternalFuture<?> formatAsync() {
- try {
- IgniteUuid id = meta.format();
-
- if (id == null)
- return new GridFinishedFuture<Object>();
- else {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
- GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
- if (oldFut != null)
- return oldFut;
- else {
- if (!meta.exists(id)) {
- // Safety in case response message was received before we put future into collection.
- fut.onDone();
-
- delFuts.remove(id, fut);
- }
+ GridFutureAdapter<?> fut = new GridFutureAdapter<>();
- return fut;
- }
- }
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<Object>(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- Collection<IgniteUuid> ids = meta.pendingDeletes();
+ Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" +
+ FORMAT_THREAD_IDX_GEN.incrementAndGet());
- if (!ids.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Constructing delete future for trash entries: " + ids);
+ t.setDaemon(true);
- GridCompoundFuture<Object, Object> resFut = new GridCompoundFuture<>();
+ t.start();
- for (IgniteUuid id : ids) {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
- IgniteInternalFuture<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
- if (oldFut != null)
- resFut.add(oldFut);
- else {
- if (meta.exists(id))
- resFut.add(fut);
- else {
- fut.onDone();
-
- delFuts.remove(id, fut);
- }
- }
- }
-
- resFut.markInitialized();
-
- return resFut;
- }
- else
- return new GridFinishedFuture<>();
+ return fut;
}
/**
@@ -1482,24 +1421,6 @@ public final class IgfsImpl implements IgfsEx {
return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile());
}
- /**
- * Check whether IGFS with the same name exists among provided attributes.
- *
- * @param attrs Attributes.
- * @return {@code True} in case IGFS with the same name exists among provided attributes
- */
- private boolean sameIgfs(IgfsAttributes[] attrs) {
- if (attrs != null) {
- String igfsName = name();
-
- for (IgfsAttributes attr : attrs) {
- if (F.eq(igfsName, attr.igfsName()))
- return true;
- }
- }
- return false;
- }
-
/** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) {
@@ -1905,81 +1826,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- /**
- * Format message listener required for format action completion.
- */
- private class FormatMessageListener implements GridMessageListener {
- /** {@inheritDoc} */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override public void onMessage(UUID nodeId, Object msg) {
- if (msg instanceof IgfsDeleteMessage) {
- ClusterNode node = igfsCtx.kernalContext().discovery().node(nodeId);
-
- if (node != null) {
- if (sameIgfs((IgfsAttributes[]) node.attribute(ATTR_IGFS))) {
- IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg;
-
- try {
- msg0.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal message (will ignore): " + msg0, e);
-
- return;
- }
-
- assert msg0.id() != null;
-
- GridFutureAdapter<?> fut = delFuts.remove(msg0.id());
-
- if (fut != null) {
- if (msg0.error() == null)
- fut.onDone();
- else
- fut.onDone(msg0.error());
- }
- }
- }
- }
- }
- }
-
- /**
- * Discovery listener required for format actions completion.
- */
- private class FormatDiscoveryListener implements GridLocalEventListener {
- /** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
-
- DiscoveryEvent evt0 = (DiscoveryEvent)evt;
-
- if (evt0.eventNode() != null) {
- if (sameIgfs((IgfsAttributes[]) evt0.eventNode().attribute(ATTR_IGFS))) {
- Collection<IgniteUuid> rmv = new HashSet<>();
-
- for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut : delFuts.entrySet()) {
- IgniteUuid id = fut.getKey();
-
- try {
- if (!meta.exists(id)) {
- fut.getValue().onDone();
-
- rmv.add(id);
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to check file existence: " + id, e);
- }
- }
-
- for (IgniteUuid id : rmv)
- delFuts.remove(id);
- }
- }
- }
- }
-
/** {@inheritDoc} */
@Override public IgniteUuid nextAffinityKey() {
return safeOp(new Callable<IgniteUuid>() {
@@ -2079,4 +1925,39 @@ public final class IgfsImpl implements IgfsEx {
return t;
}
}
+
+ /**
+ * Format runnable.
+ */
+ private class FormatRunnable implements Runnable {
+ /** Target future. */
+ private final GridFutureAdapter<?> fut;
+
+ /**
+ * Constructor.
+ *
+ * @param fut Future.
+ */
+ public FormatRunnable(GridFutureAdapter<?> fut) {
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgfsException err = null;
+
+ try {
+ format();
+ }
+ catch (Throwable err0) {
+ err = IgfsUtils.toIgfsException(err0);
+ }
+ finally {
+ if (err == null)
+ fut.onDone();
+ else
+ fut.onDone(err);
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 6fa9877..cfe549f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -180,7 +180,7 @@ public class IgfsUtils {
* @return Converted IGFS exception.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public static IgfsException toIgfsException(Exception err) {
+ public static IgfsException toIgfsException(Throwable err) {
IgfsException err0 = err instanceof IgfsException ? (IgfsException)err : null;
IgfsException igfsErr = X.cause(err, IgfsException.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
index fd4ec17..4e0f12b 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
@@ -239,8 +239,6 @@ public class IgfsFragmentizerSelfTest extends IgfsFragmentizerAbstractSelfTest {
igfs.format();
- igfs.awaitDeletesAsync().get();
-
GridTestUtils.retryAssert(log, 50, 100, new CA() {
@Override public void apply() {
for (int i = 0; i < NODE_CNT; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
index 3933e86..266945f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -41,27 +40,21 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.transactions.Transaction;
import org.jsr166.ThreadLocalRandom8;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* {@link IgfsAttributes} test case.
@@ -256,41 +249,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
}
/**
- * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testPartitionedOversizeDelay() throws Exception {
- cacheMode = PARTITIONED;
- nearEnabled = true;
-
- checkOversizeDelay();
- }
-
- /**
- * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testColocatedOversizeDelay() throws Exception {
- cacheMode = PARTITIONED;
- nearEnabled = false;
-
- checkOversizeDelay();
- }
-
- /**
- * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testReplicatedOversizeDelay() throws Exception {
- cacheMode = REPLICATED;
-
- checkOversizeDelay();
- }
-
- /**
* Ensure that IGFS size is correctly updated in case of preloading for PARTITIONED cache.
*
* @throws Exception If failed.
@@ -484,97 +442,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
}
/**
- * Ensure that exception is not thrown or thrown with some delay when there is something in trash directory.
- *
- * @throws Exception If failed.
- */
- private void checkOversizeDelay() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
-
- igfsMaxData = 256;
- trashPurgeTimeout = 2000;
-
- startUp();
-
- IgfsImpl igfs = igfs(0);
-
- final IgfsPath path = new IgfsPath("/file");
- final IgfsPath otherPath = new IgfsPath("/fileOther");
-
- // Fill cache with data up to it's limit.
- IgfsOutputStream os = igfs.create(path, false);
- os.write(chunk((int)igfsMaxData));
- os.close();
-
- final IgniteCache<IgniteUuid, IgfsEntryInfo> metaCache = igfs.context().kernalContext().cache().jcache(
- igfs.configuration().getMetaCacheName());
-
- // Start a transaction in a separate thread which will lock file ID.
- final IgniteUuid id = igfs.context().meta().fileId(path);
- final IgfsEntryInfo info = igfs.context().meta().info(id);
-
- final AtomicReference<Throwable> err = new AtomicReference<>();
-
- try {
- new Thread(new Runnable() {
- @Override public void run() {
- try {
-
- try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- metaCache.get(id);
-
- latch.await();
-
- U.sleep(1000); // Sleep here so that data manager could "see" oversize.
-
- tx.commit();
- }
- }
- catch (Throwable e) {
- err.set(e);
- }
- }
- }).start();
-
- // Now add file ID to trash listing so that delete worker could "see" it.
- IgniteUuid trashId = IgfsUtils.randomTrashId();
-
- try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(),
- new IgfsListingEntry(info));
-
- // Clear root listing.
- metaCache.put(IgfsUtils.ROOT_ID, IgfsUtils.createDirectory(IgfsUtils.ROOT_ID));
-
- // Add file to trash listing.
- IgfsEntryInfo trashInfo = metaCache.get(trashId);
-
- if (trashInfo == null)
- metaCache.put(trashId, IgfsUtils.createDirectory(trashId).listing(listing));
- else
- metaCache.put(trashId, trashInfo.listing(listing));
-
- tx.commit();
- }
-
- assert metaCache.get(trashId) != null;
-
- // Now the file is locked and is located in trash, try adding some more data.
- os = igfs.create(otherPath, false);
- os.write(new byte[1]);
-
- latch.countDown();
-
- os.close();
-
- assert err.get() == null;
- }
- finally {
- latch.countDown(); // Safety.
- }
- }
-
- /**
* Ensure that IGFS size is correctly updated in case of preloading.
*
* @throws Exception If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index b38f3a2..ffa6f7d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -41,7 +41,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -754,11 +753,6 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
@Nullable @Override public String clientLogDirectory() {
return null;
}