You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/06 14:28:32 UTC

ignite git commit: IGNITE-3260: Removed delete message routines.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3260 c84140784 -> 062dc4ea7


IGNITE-3260: Removed delete message routines.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/062dc4ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/062dc4ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/062dc4ea

Branch: refs/heads/ignite-3260
Commit: 062dc4ea78b6616e7ef4f944614728acd2331661
Parents: c841407
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 17:27:48 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 17:27:48 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsDeleteWorker.java       |  42 ----
 .../internal/processors/igfs/IgfsImpl.java      | 212 ++++++-------------
 .../internal/processors/igfs/IgfsUtils.java     |   2 +-
 3 files changed, 66 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/062dc4ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index bae9354..310090d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -19,13 +19,10 @@ package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 
@@ -37,8 +34,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-
 /**
  * IGFS worker for removal from the trash directory.
  */
@@ -49,9 +44,6 @@ public class IgfsDeleteWorker extends IgfsThread {
     /** How many files/folders to delete at once (i.e in a single transaction). */
     private static final int MAX_DELETE_BATCH = 100;
 
-    /** IGFS context. */
-    private final IgfsContext igfsCtx;
-
     /** Metadata manager. */
     private final IgfsMetaManager meta;
 
@@ -73,9 +65,6 @@ public class IgfsDeleteWorker extends IgfsThread {
     /** Cancellation flag. */
     private volatile boolean cancelled;
 
-    /** Message topic. */
-    private Object topic;
-
     /**
      * Constructor.
      *
@@ -84,15 +73,9 @@ public class IgfsDeleteWorker extends IgfsThread {
     IgfsDeleteWorker(IgfsContext igfsCtx) {
         super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%");
 
-        this.igfsCtx = igfsCtx;
-
         meta = igfsCtx.meta();
         data = igfsCtx.data();
 
-        String igfsName = igfsCtx.igfs().name();
-
-        topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
-
         assert meta != null;
         assert data != null;
 
@@ -189,8 +172,6 @@ public class IgfsDeleteWorker extends IgfsThread {
                             if (log.isDebugEnabled())
                                 log.debug("Sending delete confirmation message [name=" + entry.getKey() +
                                     ", fileId=" + fileId + ']');
-
-                            sendDeleteMessage(new IgfsDeleteMessage(fileId));
                         }
                     }
                     else
@@ -201,8 +182,6 @@ public class IgfsDeleteWorker extends IgfsThread {
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e);
-
-                    sendDeleteMessage(new IgfsDeleteMessage(fileId, e));
                 }
             }
         }
@@ -346,25 +325,4 @@ public class IgfsDeleteWorker extends IgfsThread {
                 return true; // Directory entry was deleted concurrently.
         }
     }
-
-    /**
-     * Send delete message to all meta cache nodes in the grid.
-     *
-     * @param msg Message to send.
-     */
-    private void sendDeleteMessage(IgfsDeleteMessage msg) {
-        assert msg != null;
-
-        Collection<ClusterNode> nodes = meta.metaCacheNodes();
-
-        for (ClusterNode node : nodes) {
-            try {
-                igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL);
-            }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() +
-                    ", msg=" + msg + ", err=" + e.getMessage() + ']');
-            }
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/062dc4ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index bc461b7..262dfef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -32,10 +32,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTaskSplitAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.events.IgfsEvent;
 import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsInvalidPathException;
 import org.apache.ignite.igfs.IgfsMetrics;
@@ -51,9 +50,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
@@ -69,7 +66,6 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallab
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -99,11 +95,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
@@ -113,14 +109,10 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
 import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
 
 /**
  * Cache-based IGFS implementation.
@@ -129,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
     /** Default permissions for file system entry. */
     private static final String PERMISSION_DFLT_VAL = "0777";
 
+    /** Index generator for async format threads. */
+    private static final AtomicInteger FORMAT_THREAD_IDX_GEN = new AtomicInteger();
+
     /** Default directory metadata. */
     static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION, PERMISSION_DFLT_VAL);
 
@@ -168,24 +163,12 @@ public final class IgfsImpl implements IgfsEx {
     /** Writers map. */
     private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new ConcurrentHashMap8<>();
 
-    /** Delete futures. */
-    private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>();
-
-    /** Delete message listener. */
-    private final GridMessageListener delMsgLsnr = new FormatMessageListener();
-
-    /** Format discovery listener. */
-    private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener();
-
     /** Local metrics holder. */
     private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
 
     /** Client log directory. */
     private volatile String logDir;
 
-    /** Message topic. */
-    private Object topic;
-
     /** Eviction policy (if set). */
     private IgfsPerBlockLruEvictionPolicy evictPlc;
 
@@ -291,11 +274,6 @@ public final class IgfsImpl implements IgfsEx {
             }
         }
 
-        topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
-        igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
-        igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
         dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L,
             new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
     }
@@ -331,9 +309,6 @@ public final class IgfsImpl implements IgfsEx {
             }
         }
 
-        igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
-        igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
-
         // Restore interrupted flag.
         if (interrupted)
             Thread.currentThread().interrupt();
@@ -1380,7 +1355,25 @@ public final class IgfsImpl implements IgfsEx {
     /** {@inheritDoc} */
     @Override public void format() {
         try {
-            formatAsync().get();
+            IgniteUuid id = meta.format();
+
+            // If ID is null, then file system is already empty.
+            if (id == null)
+                return;
+
+            while (true) {
+                if (enterBusy()) {
+                    try {
+                        if (!meta.exists(id))
+                            return;
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+
+                U.sleep(10);
+            }
         }
         catch (Exception e) {
             throw IgfsUtils.toIgfsException(e);
@@ -1393,33 +1386,16 @@ public final class IgfsImpl implements IgfsEx {
      * @return Future.
      */
     IgniteInternalFuture<?> formatAsync() {
-        try {
-            IgniteUuid id = meta.format();
+        GridFutureAdapter<?> fut = new GridFutureAdapter<>();
 
-            if (id == null)
-                return new GridFinishedFuture<Object>();
-            else {
-                GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
+        Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" +
+            FORMAT_THREAD_IDX_GEN.incrementAndGet());
 
-                GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut);
+        t.setDaemon(true);
 
-                if (oldFut != null)
-                    return oldFut;
-                else {
-                    if (!meta.exists(id)) {
-                        // Safety in case response message was received before we put future into collection.
-                        fut.onDone();
+        t.start();
 
-                        delFuts.remove(id, fut);
-                    }
-
-                    return fut;
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<Object>(e);
-        }
+        return fut;
     }
 
     /**
@@ -1445,24 +1421,6 @@ public final class IgfsImpl implements IgfsEx {
         return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile());
     }
 
-    /**
-     * Check whether IGFS with the same name exists among provided attributes.
-     *
-     * @param attrs Attributes.
-     * @return {@code True} in case IGFS with the same name exists among provided attributes
-     */
-    private boolean sameIgfs(IgfsAttributes[] attrs) {
-        if (attrs != null) {
-            String igfsName = name();
-
-            for (IgfsAttributes attr : attrs) {
-                if (F.eq(igfsName, attr.igfsName()))
-                    return true;
-            }
-        }
-        return false;
-    }
-
     /** {@inheritDoc} */
     @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
         Collection<IgfsPath> paths, @Nullable T arg) {
@@ -1868,81 +1826,6 @@ public final class IgfsImpl implements IgfsEx {
         }
     }
 
-    /**
-     * Format message listener required for format action completion.
-     */
-    private class FormatMessageListener implements GridMessageListener {
-        /** {@inheritDoc} */
-        @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-        @Override public void onMessage(UUID nodeId, Object msg) {
-            if (msg instanceof IgfsDeleteMessage) {
-                ClusterNode node = igfsCtx.kernalContext().discovery().node(nodeId);
-
-                if (node != null) {
-                    if (sameIgfs((IgfsAttributes[]) node.attribute(ATTR_IGFS))) {
-                        IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg;
-
-                        try {
-                            msg0.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to unmarshal message (will ignore): " + msg0, e);
-
-                            return;
-                        }
-
-                        assert msg0.id() != null;
-
-                        GridFutureAdapter<?> fut = delFuts.remove(msg0.id());
-
-                        if (fut != null) {
-                            if (msg0.error() == null)
-                                fut.onDone();
-                            else
-                                fut.onDone(msg0.error());
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Discovery listener required for format actions completion.
-     */
-    private class FormatDiscoveryListener implements GridLocalEventListener {
-        /** {@inheritDoc} */
-        @Override public void onEvent(Event evt) {
-            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
-
-            DiscoveryEvent evt0 = (DiscoveryEvent)evt;
-
-            if (evt0.eventNode() != null) {
-                if (sameIgfs((IgfsAttributes[]) evt0.eventNode().attribute(ATTR_IGFS))) {
-                    Collection<IgniteUuid> rmv = new HashSet<>();
-
-                    for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut : delFuts.entrySet()) {
-                        IgniteUuid id = fut.getKey();
-
-                        try {
-                            if (!meta.exists(id)) {
-                                fut.getValue().onDone();
-
-                                rmv.add(id);
-                            }
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to check file existence: " + id, e);
-                        }
-                    }
-
-                    for (IgniteUuid id : rmv)
-                        delFuts.remove(id);
-                }
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid nextAffinityKey() {
         return safeOp(new Callable<IgniteUuid>() {
@@ -2042,4 +1925,39 @@ public final class IgfsImpl implements IgfsEx {
             return t;
         }
     }
+
+    /**
+     * Format runnable.
+     */
+    private class FormatRunnable implements Runnable {
+        /** Target future. */
+        private final GridFutureAdapter<?> fut;
+
+        /**
+         * Constructor.
+         *
+         * @param fut Future.
+         */
+        public FormatRunnable(GridFutureAdapter<?> fut) {
+            this.fut = fut;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgfsException err = null;
+
+            try {
+                format();
+            }
+            catch (Throwable err0) {
+                err = IgfsUtils.toIgfsException(err0);
+            }
+            finally {
+                if (err == null)
+                    fut.onDone();
+                else
+                    fut.onDone(err);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/062dc4ea/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 6fa9877..cfe549f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -180,7 +180,7 @@ public class IgfsUtils {
      * @return Converted IGFS exception.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public static IgfsException toIgfsException(Exception err) {
+    public static IgfsException toIgfsException(Throwable err) {
         IgfsException err0 = err instanceof IgfsException ? (IgfsException)err : null;
 
         IgfsException igfsErr = X.cause(err, IgfsException.class);