You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/06 14:28:32 UTC
ignite git commit: IGNITE-3260: Removed delete message routines.
Repository: ignite
Updated Branches:
refs/heads/ignite-3260 c84140784 -> 062dc4ea7
IGNITE-3260: Removed delete message routines.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/062dc4ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/062dc4ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/062dc4ea
Branch: refs/heads/ignite-3260
Commit: 062dc4ea78b6616e7ef4f944614728acd2331661
Parents: c841407
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 17:27:48 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 17:27:48 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsDeleteWorker.java | 42 ----
.../internal/processors/igfs/IgfsImpl.java | 212 ++++++-------------
.../internal/processors/igfs/IgfsUtils.java | 2 +-
3 files changed, 66 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/062dc4ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index bae9354..310090d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -19,13 +19,10 @@ package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -37,8 +34,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-
/**
* IGFS worker for removal from the trash directory.
*/
@@ -49,9 +44,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** How many files/folders to delete at once (i.e in a single transaction). */
private static final int MAX_DELETE_BATCH = 100;
- /** IGFS context. */
- private final IgfsContext igfsCtx;
-
/** Metadata manager. */
private final IgfsMetaManager meta;
@@ -73,9 +65,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** Cancellation flag. */
private volatile boolean cancelled;
- /** Message topic. */
- private Object topic;
-
/**
* Constructor.
*
@@ -84,15 +73,9 @@ public class IgfsDeleteWorker extends IgfsThread {
IgfsDeleteWorker(IgfsContext igfsCtx) {
super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%");
- this.igfsCtx = igfsCtx;
-
meta = igfsCtx.meta();
data = igfsCtx.data();
- String igfsName = igfsCtx.igfs().name();
-
- topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
-
assert meta != null;
assert data != null;
@@ -189,8 +172,6 @@ public class IgfsDeleteWorker extends IgfsThread {
if (log.isDebugEnabled())
log.debug("Sending delete confirmation message [name=" + entry.getKey() +
", fileId=" + fileId + ']');
-
- sendDeleteMessage(new IgfsDeleteMessage(fileId));
}
}
else
@@ -201,8 +182,6 @@ public class IgfsDeleteWorker extends IgfsThread {
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e);
-
- sendDeleteMessage(new IgfsDeleteMessage(fileId, e));
}
}
}
@@ -346,25 +325,4 @@ public class IgfsDeleteWorker extends IgfsThread {
return true; // Directory entry was deleted concurrently.
}
}
-
- /**
- * Send delete message to all meta cache nodes in the grid.
- *
- * @param msg Message to send.
- */
- private void sendDeleteMessage(IgfsDeleteMessage msg) {
- assert msg != null;
-
- Collection<ClusterNode> nodes = meta.metaCacheNodes();
-
- for (ClusterNode node : nodes) {
- try {
- igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL);
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() +
- ", msg=" + msg + ", err=" + e.getMessage() + ']');
- }
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/062dc4ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index bc461b7..262dfef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -32,10 +32,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsInvalidPathException;
import org.apache.ignite.igfs.IgfsMetrics;
@@ -51,9 +50,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
@@ -69,7 +66,6 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallab
import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
@@ -99,11 +95,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
@@ -113,14 +109,10 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
/**
* Cache-based IGFS implementation.
@@ -129,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
/** Default permissions for file system entry. */
private static final String PERMISSION_DFLT_VAL = "0777";
+ /** Index generator for async format threads. */
+ private static final AtomicInteger FORMAT_THREAD_IDX_GEN = new AtomicInteger();
+
/** Default directory metadata. */
static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION, PERMISSION_DFLT_VAL);
@@ -168,24 +163,12 @@ public final class IgfsImpl implements IgfsEx {
/** Writers map. */
private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new ConcurrentHashMap8<>();
- /** Delete futures. */
- private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>();
-
- /** Delete message listener. */
- private final GridMessageListener delMsgLsnr = new FormatMessageListener();
-
- /** Format discovery listener. */
- private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener();
-
/** Local metrics holder. */
private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
/** Client log directory. */
private volatile String logDir;
- /** Message topic. */
- private Object topic;
-
/** Eviction policy (if set). */
private IgfsPerBlockLruEvictionPolicy evictPlc;
@@ -291,11 +274,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
- igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
- igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L,
new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
}
@@ -331,9 +309,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
- igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
-
// Restore interrupted flag.
if (interrupted)
Thread.currentThread().interrupt();
@@ -1380,7 +1355,25 @@ public final class IgfsImpl implements IgfsEx {
/** {@inheritDoc} */
@Override public void format() {
try {
- formatAsync().get();
+ IgniteUuid id = meta.format();
+
+ // If ID is null, then file system is already empty.
+ if (id == null)
+ return;
+
+ while (true) {
+ if (enterBusy()) {
+ try {
+ if (!meta.exists(id))
+ return;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ U.sleep(10);
+ }
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
@@ -1393,33 +1386,16 @@ public final class IgfsImpl implements IgfsEx {
* @return Future.
*/
IgniteInternalFuture<?> formatAsync() {
- try {
- IgniteUuid id = meta.format();
+ GridFutureAdapter<?> fut = new GridFutureAdapter<>();
- if (id == null)
- return new GridFinishedFuture<Object>();
- else {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
+ Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" +
+ FORMAT_THREAD_IDX_GEN.incrementAndGet());
- GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut);
+ t.setDaemon(true);
- if (oldFut != null)
- return oldFut;
- else {
- if (!meta.exists(id)) {
- // Safety in case response message was received before we put future into collection.
- fut.onDone();
+ t.start();
- delFuts.remove(id, fut);
- }
-
- return fut;
- }
- }
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<Object>(e);
- }
+ return fut;
}
/**
@@ -1445,24 +1421,6 @@ public final class IgfsImpl implements IgfsEx {
return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile());
}
- /**
- * Check whether IGFS with the same name exists among provided attributes.
- *
- * @param attrs Attributes.
- * @return {@code True} in case IGFS with the same name exists among provided attributes
- */
- private boolean sameIgfs(IgfsAttributes[] attrs) {
- if (attrs != null) {
- String igfsName = name();
-
- for (IgfsAttributes attr : attrs) {
- if (F.eq(igfsName, attr.igfsName()))
- return true;
- }
- }
- return false;
- }
-
/** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) {
@@ -1868,81 +1826,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- /**
- * Format message listener required for format action completion.
- */
- private class FormatMessageListener implements GridMessageListener {
- /** {@inheritDoc} */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override public void onMessage(UUID nodeId, Object msg) {
- if (msg instanceof IgfsDeleteMessage) {
- ClusterNode node = igfsCtx.kernalContext().discovery().node(nodeId);
-
- if (node != null) {
- if (sameIgfs((IgfsAttributes[]) node.attribute(ATTR_IGFS))) {
- IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg;
-
- try {
- msg0.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal message (will ignore): " + msg0, e);
-
- return;
- }
-
- assert msg0.id() != null;
-
- GridFutureAdapter<?> fut = delFuts.remove(msg0.id());
-
- if (fut != null) {
- if (msg0.error() == null)
- fut.onDone();
- else
- fut.onDone(msg0.error());
- }
- }
- }
- }
- }
- }
-
- /**
- * Discovery listener required for format actions completion.
- */
- private class FormatDiscoveryListener implements GridLocalEventListener {
- /** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
-
- DiscoveryEvent evt0 = (DiscoveryEvent)evt;
-
- if (evt0.eventNode() != null) {
- if (sameIgfs((IgfsAttributes[]) evt0.eventNode().attribute(ATTR_IGFS))) {
- Collection<IgniteUuid> rmv = new HashSet<>();
-
- for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut : delFuts.entrySet()) {
- IgniteUuid id = fut.getKey();
-
- try {
- if (!meta.exists(id)) {
- fut.getValue().onDone();
-
- rmv.add(id);
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to check file existence: " + id, e);
- }
- }
-
- for (IgniteUuid id : rmv)
- delFuts.remove(id);
- }
- }
- }
- }
-
/** {@inheritDoc} */
@Override public IgniteUuid nextAffinityKey() {
return safeOp(new Callable<IgniteUuid>() {
@@ -2042,4 +1925,39 @@ public final class IgfsImpl implements IgfsEx {
return t;
}
}
+
+ /**
+ * Format runnable.
+ */
+ private class FormatRunnable implements Runnable {
+ /** Target future. */
+ private final GridFutureAdapter<?> fut;
+
+ /**
+ * Constructor.
+ *
+ * @param fut Future.
+ */
+ public FormatRunnable(GridFutureAdapter<?> fut) {
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgfsException err = null;
+
+ try {
+ format();
+ }
+ catch (Throwable err0) {
+ err = IgfsUtils.toIgfsException(err0);
+ }
+ finally {
+ if (err == null)
+ fut.onDone();
+ else
+ fut.onDone(err);
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/062dc4ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 6fa9877..cfe549f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -180,7 +180,7 @@ public class IgfsUtils {
* @return Converted IGFS exception.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public static IgfsException toIgfsException(Exception err) {
+ public static IgfsException toIgfsException(Throwable err) {
IgfsException err0 = err instanceof IgfsException ? (IgfsException)err : null;
IgfsException igfsErr = X.cause(err, IgfsException.class);