You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/17 11:50:03 UTC

[1/4] ignite git commit: IGNITE-2846: IGFS: Reworked IgfsMetaManager.updateInfo() operation to use "invoke" instead of "put".

Repository: ignite
Updated Branches:
  refs/heads/ignite-1786 d208e44be -> 929189111


IGNITE-2846: IGFS: Reworked IgfsMetaManager.updateInfo() operation to use "invoke" instead of "put".


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1efc5a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1efc5a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1efc5a0

Branch: refs/heads/ignite-1786
Commit: a1efc5a06b15acffa40ad0a9d3352206061b42f6
Parents: dfe5ea8
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Mar 16 13:22:07 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 16 13:22:07 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsFileMap.java   |   9 +-
 .../igfs/IgfsFragmentizerManager.java           | 166 ++++++++++++++-----
 .../igfs/IgfsInvalidRangeException.java         |   4 +-
 .../processors/igfs/IgfsMetaManager.java        |  67 ++++----
 .../igfs/IgfsMetaManagerSelfTest.java           |  11 --
 5 files changed, 161 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
index 2c0358b..9ea69ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
@@ -128,12 +128,11 @@ public class IgfsFileMap implements Externalizable {
      *
      * @param range Range to update status.
      * @param status New range status.
-     * @throws IgniteCheckedException If range was not found.
      */
-    public void updateRangeStatus(IgfsFileAffinityRange range, int status) throws IgniteCheckedException {
+    public void updateRangeStatus(IgfsFileAffinityRange range, int status) {
         if (ranges == null)
             throw new IgfsInvalidRangeException("Failed to update range status (file map is empty) " +
-                "[range=" + range + ", ranges=" + ranges + ']');
+                "[range=" + range + ", ranges=null]");
 
         assert !ranges.isEmpty();
 
@@ -190,10 +189,10 @@ public class IgfsFileMap implements Externalizable {
      *
      * @param range Range to delete.
      */
-    public void deleteRange(IgfsFileAffinityRange range) throws IgniteCheckedException {
+    public void deleteRange(IgfsFileAffinityRange range) {
         if (ranges == null)
             throw new IgfsInvalidRangeException("Failed to remove range (file map is empty) " +
-                "[range=" + range + ", ranges=" + ranges + ']');
+                "[range=" + range + ", ranges=null]");
 
         assert !ranges.isEmpty();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 899730d..7cc5cb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.processors.igfs;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -41,19 +45,22 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
-import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -286,7 +293,8 @@ public class IgfsFragmentizerManager extends IgfsManager {
                 switch (range.status()) {
                     case RANGE_STATUS_INITIAL: {
                         // Mark range as moving.
-                        updated = igfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVING));
+                        updated = igfsCtx.meta().updateInfo(
+                            fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVING));
 
                         if (updated == null) {
                             igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -302,7 +310,8 @@ public class IgfsFragmentizerManager extends IgfsManager {
                         igfsCtx.data().spreadBlocks(fileInfo, range);
 
                         // Mark range as moved.
-                        updated = igfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVED));
+                        updated = igfsCtx.meta().updateInfo(
+                            fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVED));
 
                         if (updated == null) {
                             igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -318,7 +327,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
                         igfsCtx.data().cleanBlocks(fileInfo, range, false);
 
                         // Remove range from map.
-                        updated = igfsCtx.meta().updateInfo(fileId, deleteRange(range));
+                        updated = igfsCtx.meta().updateInfo(fileId, new RangeDeleteProcessor(range));
 
                         if (updated == null)
                             igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -334,57 +343,132 @@ public class IgfsFragmentizerManager extends IgfsManager {
     }
 
     /**
-     * Creates update info closure that will mark given range as moving.
-     *
-     * @param range Range to mark as moving.
-     * @param status Status.
-     * @return Update closure.
+     * Update range processor.
      */
-    private IgniteClosure<IgfsFileInfo, IgfsFileInfo> updateRange(final IgfsFileAffinityRange range,
-        final int status) {
-        return new CX1<IgfsFileInfo, IgfsFileInfo>() {
-            @Override public IgfsFileInfo applyx(IgfsFileInfo info) throws IgniteCheckedException {
-                IgfsFileMap map = new IgfsFileMap(info.fileMap());
+    private static class RangeUpdateProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-                map.updateRangeStatus(range, status);
+        /** Range. */
+        private IgfsFileAffinityRange range;
 
-                if (log.isDebugEnabled())
-                    log.debug("Updated file map for range [fileId=" + info.id() + ", range=" + range +
-                        ", status=" + status + ", oldMap=" + info.fileMap() + ", newMap=" + map + ']');
+        /** Status. */
+        private int status;
 
-                IgfsFileInfo updated = new IgfsFileInfo(info, info.length());
+        /**
+         * Constructor.
+         */
+        public RangeUpdateProcessor() {
+            // No-op.
+        }
 
-                updated.fileMap(map);
+        /**
+         * Constructor.
+         *
+         * @param range Range.
+         * @param status Status.
+         */
+        public RangeUpdateProcessor(IgfsFileAffinityRange range, int status) {
+            this.range = range;
+            this.status = status;
+        }
 
-                return updated;
-            }
-        };
+        /** {@inheritDoc} */
+        @Override public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo oldInfo = entry.getValue();
+
+            IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
+
+            newMap.updateRangeStatus(range, status);
+
+            IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length());
+
+            newInfo.fileMap(newMap);
+
+            entry.setValue(newInfo);
+
+            return newInfo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(range);
+            out.writeInt(status);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            range = (IgfsFileAffinityRange)in.readObject();
+            status = in.readInt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RangeUpdateProcessor.class, this);
+        }
     }
 
     /**
-     * Creates update info closure that will mark given range as moving.
-     *
-     * @param range Range to mark as moving.
-     * @return Update closure.
+     * Delete range processor.
      */
-    private IgniteClosure<IgfsFileInfo, IgfsFileInfo> deleteRange(final IgfsFileAffinityRange range) {
-        return new CX1<IgfsFileInfo, IgfsFileInfo>() {
-            @Override public IgfsFileInfo applyx(IgfsFileInfo info) throws IgniteCheckedException {
-                IgfsFileMap map = new IgfsFileMap(info.fileMap());
+    private static class RangeDeleteProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-                map.deleteRange(range);
+        /** Range. */
+        private IgfsFileAffinityRange range;
 
-                if (log.isDebugEnabled())
-                    log.debug("Deleted range from file map [fileId=" + info.id() + ", range=" + range +
-                        ", oldMap=" + info.fileMap() + ", newMap=" + map + ']');
+        /**
+         * Constructor.
+         */
+        public RangeDeleteProcessor() {
+            // No-op.
+        }
 
-                IgfsFileInfo updated = new IgfsFileInfo(info, info.length());
+        /**
+         * Constructor.
+         *
+         * @param range Range.
+         */
+        public RangeDeleteProcessor(IgfsFileAffinityRange range) {
+            this.range = range;
+        }
 
-                updated.fileMap(map);
+        /** {@inheritDoc} */
+        @Override public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo oldInfo = entry.getValue();
 
-                return updated;
-            }
-        };
+            IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
+
+            newMap.deleteRange(range);
+
+            IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length());
+
+            newInfo.fileMap(newMap);
+
+            entry.setValue(newInfo);
+
+            return newInfo;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(range);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            range = (IgfsFileAffinityRange)in.readObject();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RangeDeleteProcessor.class, this);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java
index cd93278..d6ad2b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.internal.processors.igfs;
 
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 
 /**
  * Internal exception thrown when attempted to update range that is no longer present
  * in file affinity map.
  */
-public class IgfsInvalidRangeException extends IgniteCheckedException {
+public class IgfsInvalidRangeException extends IgniteException {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 2a85cf8..b4774f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1257,7 +1257,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                 // Remove listing entries from root.
                 for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet())
-                    id2InfoPrj.invoke(IgfsUtils.ROOT_ID, new ListingRemove(entry.getKey(), entry.getValue().fileId()));
+                    id2InfoPrj.invoke(IgfsUtils.ROOT_ID, new ListingRemoveProcessor(entry.getKey(), entry.getValue().fileId()));
 
                 resId = newInfo.id();
             }
@@ -1408,7 +1408,7 @@ public class IgfsMetaManager extends IgfsManager {
                         IgfsListingEntry childEntry = parentInfo.listing().get(name);
 
                         if (childEntry != null)
-                            id2InfoPrj.invoke(parentId, new ListingRemove(name, id));
+                            id2InfoPrj.invoke(parentId, new ListingRemoveProcessor(name, id));
 
                         id2InfoPrj.remove(id);
 
@@ -1584,20 +1584,20 @@ public class IgfsMetaManager extends IgfsManager {
      * Update file info in cache.
      *
      * @param fileId File ID to update information for.
-     * @param c Closure to update file's info inside transaction.
+     * @param proc Entry processor to invoke.
      * @return Updated file info or {@code null} if such file ID not found.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public IgfsFileInfo updateInfo(IgniteUuid fileId, IgniteClosure<IgfsFileInfo, IgfsFileInfo> c)
-        throws IgniteCheckedException {
+    @Nullable public IgfsFileInfo updateInfo(IgniteUuid fileId,
+        EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo> proc) throws IgniteCheckedException {
         validTxState(false);
         assert fileId != null;
-        assert c != null;
+        assert proc != null;
 
         if (busyLock.enterBusy()) {
             try {
                 if (log.isDebugEnabled())
-                    log.debug("Update file info [fileId=" + fileId + ", c=" + c + ']');
+                    log.debug("Update file info [fileId=" + fileId + ", proc=" + proc + ']');
 
                 IgniteInternalTx tx = startTx();
 
@@ -1608,27 +1608,21 @@ public class IgfsMetaManager extends IgfsManager {
                     if (oldInfo == null)
                         return null; // File not found.
 
-                    IgfsFileInfo newInfo = c.apply(oldInfo);
+                    IgfsFileInfo newInfo = invokeAndGet(fileId, proc);
 
                     if (newInfo == null)
                         throw fsException("Failed to update file info with null value" +
-                            " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
+                            " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
 
                     if (!oldInfo.id().equals(newInfo.id()))
                         throw fsException("Failed to update file info (file IDs differ)" +
-                            " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
+                            " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
 
                     if (oldInfo.isDirectory() != newInfo.isDirectory())
                         throw fsException("Failed to update file info (file types differ)" +
-                            " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
-
-                    boolean b = id2InfoPrj.replace(fileId, oldInfo, newInfo);
-
-                    assert b : "Inconsistent transaction state [oldInfo=" + oldInfo + ", newInfo=" + newInfo +
-                        ", c=" + c + ']';
+                            " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
 
-                    if (tx != null)
-                        tx.commit();
+                    tx.commit();
 
                     return newInfo;
                 }
@@ -1636,8 +1630,7 @@ public class IgfsMetaManager extends IgfsManager {
                     throw U.cast(e);
                 }
                 finally {
-                    if (tx != null)
-                        tx.close();
+                    tx.close();
                 }
             }
             finally {
@@ -1814,7 +1807,7 @@ public class IgfsMetaManager extends IgfsManager {
             throw fsException("Failed to create new metadata entry due to ID conflict: " + info.id());
 
         if (parentId != null)
-            id2InfoPrj.invoke(parentId, new ListingAdd(name, new IgfsListingEntry(info)));
+            id2InfoPrj.invoke(parentId, new ListingAddProcessor(name, new IgfsListingEntry(info)));
     }
 
     /**
@@ -1831,8 +1824,8 @@ public class IgfsMetaManager extends IgfsManager {
         IgniteUuid destId, String destName) throws IgniteCheckedException {
         validTxState(true);
 
-        id2InfoPrj.invoke(srcId, new ListingRemove(srcName, entry.fileId()));
-        id2InfoPrj.invoke(destId, new ListingAdd(destName, entry));
+        id2InfoPrj.invoke(srcId, new ListingRemoveProcessor(srcName, entry.fileId()));
+        id2InfoPrj.invoke(destId, new ListingAddProcessor(destName, entry));
     }
 
     /**
@@ -1857,7 +1850,7 @@ public class IgfsMetaManager extends IgfsManager {
     private void invokeUpdatePath(IgniteUuid id, IgfsPath path) throws IgniteCheckedException {
         validTxState(true);
 
-        id2InfoPrj.invoke(id, new UpdatePath(path));
+        id2InfoPrj.invoke(id, new UpdatePathProcessor(path));
     }
 
     /**
@@ -2009,7 +2002,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                                 id2InfoPrj.remove(oldId); // Remove the old one.
                                 id2InfoPrj.invoke(parentInfo.id(),
-                                    new ListingRemove(path.name(), parentInfo.listing().get(path.name()).fileId()));
+                                    new ListingRemoveProcessor(path.name(), parentInfo.listing().get(path.name()).fileId()));
 
                                 createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
 
@@ -3101,7 +3094,7 @@ public class IgfsMetaManager extends IgfsManager {
      * Remove entry from directory listing.
      */
     @GridInternal
-    private static final class ListingRemove implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+    private static final class ListingRemoveProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
         Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -3115,7 +3108,7 @@ public class IgfsMetaManager extends IgfsManager {
         /**
          * Default constructor.
          */
-        public ListingRemove() {
+        public ListingRemoveProcessor() {
             // No-op.
         }
 
@@ -3125,7 +3118,7 @@ public class IgfsMetaManager extends IgfsManager {
          * @param fileName File name.
          * @param fileId File ID.
          */
-        public ListingRemove(String fileName, IgniteUuid fileId) {
+        public ListingRemoveProcessor(String fileName, IgniteUuid fileId) {
             this.fileName = fileName;
             this.fileId = fileId;
         }
@@ -3173,7 +3166,7 @@ public class IgfsMetaManager extends IgfsManager {
      * Update directory listing closure.
      */
     @GridInternal
-    private static final class ListingAdd implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+    private static final class ListingAddProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
         Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -3190,7 +3183,7 @@ public class IgfsMetaManager extends IgfsManager {
          * @param fileName File name to add into parent listing.
          * @param entry Listing entry to add or remove.
          */
-        private ListingAdd(String fileName, IgfsListingEntry entry) {
+        private ListingAddProcessor(String fileName, IgfsListingEntry entry) {
             assert fileName != null;
             assert entry != null;
 
@@ -3202,7 +3195,7 @@ public class IgfsMetaManager extends IgfsManager {
          * Empty constructor required for {@link Externalizable}.
          *
          */
-        public ListingAdd() {
+        public ListingAddProcessor() {
             // No-op.
         }
 
@@ -3242,7 +3235,7 @@ public class IgfsMetaManager extends IgfsManager {
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(ListingAdd.class, this);
+            return S.toString(ListingAddProcessor.class, this);
         }
     }
 
@@ -3250,7 +3243,7 @@ public class IgfsMetaManager extends IgfsManager {
      * Update path closure.
      */
     @GridInternal
-    private static final class UpdatePath implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+    private static final class UpdatePathProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
         Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
@@ -3261,14 +3254,14 @@ public class IgfsMetaManager extends IgfsManager {
         /**
          * @param path Path.
          */
-        private UpdatePath(IgfsPath path) {
+        private UpdatePathProcessor(IgfsPath path) {
             this.path = path;
         }
 
         /**
          * Default constructor (required by Externalizable).
          */
-        public UpdatePath() {
+        public UpdatePathProcessor() {
             // No-op.
         }
 
@@ -3293,7 +3286,7 @@ public class IgfsMetaManager extends IgfsManager {
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(UpdatePath.class, this);
+            return S.toString(UpdatePathProcessor.class, this);
         }
     }
 
@@ -3665,7 +3658,7 @@ public class IgfsMetaManager extends IgfsManager {
             leafParentId = parentId;
 
             // Now link the newly created directory chain to the lowermost existing parent:
-            id2InfoPrj.invoke(lowermostExistingId, new ListingAdd(childName, childInfo));
+            id2InfoPrj.invoke(lowermostExistingId, new ListingAddProcessor(childName, childInfo));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 72a2bee..19a91ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -352,17 +352,6 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
 
         assertEmpty(mgr.directoryListing(b.id()));
 
-        // Validate last actual data received from 'remove' operation.
-        IgfsFileInfo newF2 = mgr.updateInfo(f2.id(), new C1<IgfsFileInfo, IgfsFileInfo>() {
-            @Override public IgfsFileInfo apply(IgfsFileInfo e) {
-                return new IgfsFileInfo(e, e.length() + 20);
-            }
-        });
-
-        assertNotNull(newF2);
-        assertEquals(f2.id(), newF2.id());
-        assertNotSame(f2, newF2);
-
         del = mgr.softDelete(path("/a/f2"), false);
         assertEquals(f2.id(), del);
 


[3/4] ignite git commit: IGNITE-2853 - Fixed cancellation of the job that depends on a service

Posted by vo...@apache.org.
IGNITE-2853 - Fixed cancellation of the job that depends on a service


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3420e6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3420e6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3420e6b

Branch: refs/heads/ignite-1786
Commit: d3420e6bc5e833a6eb1daaad25b11843f97328d5
Parents: 69d1f4b
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 16 22:21:24 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 16 22:21:24 2016 -0700

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   2 +-
 .../ComputeJobCancelWithServiceSelfTest.java    | 154 +++++++++++++++++++
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 3 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 5d8daf6..8df89f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -849,6 +849,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
             startProcessor(new GridCacheProcessor(ctx));
             startProcessor(new GridQueryProcessor(ctx));
+            startProcessor(new GridServiceProcessor(ctx));
             startProcessor(new GridTaskSessionProcessor(ctx));
             startProcessor(new GridJobProcessor(ctx));
             startProcessor(new GridTaskProcessor(ctx));
@@ -860,7 +861,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
                 IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
                 IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
-            startProcessor(new GridServiceProcessor(ctx));
             startProcessor(new DataStructuresProcessor(ctx));
             startProcessor(createComponent(PlatformProcessor.class, ctx));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
new file mode 100644
index 0000000..2718ed9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSplitAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test cancellation of a job that depends on service.
+ */
+public class ComputeJobCancelWithServiceSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJobCancel() throws Exception {
+        Ignite server = startGrid("server");
+
+        server.services().deployNodeSingleton("my-service", new MyService());
+
+        Ignition.setClientMode(true);
+
+        Ignite client = startGrid("client");
+
+        IgniteCompute compute = client.compute().withAsync();
+
+        compute.execute(new MyTask(), null);
+
+        ComputeTaskFuture<Integer> fut = compute.future();
+
+        Thread.sleep(3000);
+
+        server.close();
+
+        assertEquals(42, fut.get().intValue());
+    }
+
+    /** */
+    private static class MyService implements Service {
+        /** */
+        private volatile boolean cancelled;
+
+        /** {@inheritDoc} */
+        @Override public void init(ServiceContext ctx) throws Exception {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(ServiceContext ctx) throws Exception {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel(ServiceContext ctx) {
+            cancelled = true;
+        }
+
+        /**
+         * @return Response.
+         */
+        public int hello() {
+            assertFalse("Service already cancelled!", cancelled);
+
+            return 42;
+        }
+    }
+
+    /** */
+    private static class MyTask extends ComputeTaskSplitAdapter<Object, Integer> {
+        /** {@inheritDoc} */
+        @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) {
+            return Collections.singletonList(new ComputeJobAdapter() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override
+                public Object execute() throws IgniteException {
+                    MyService svc = ignite.services().service("my-service");
+
+                    while (!isCancelled()) {
+                        try {
+                            Thread.sleep(1000);
+
+                            svc.hello();
+                        }
+                        catch (InterruptedException e) {
+                            // No-op.
+                        }
+                    }
+
+                    assertTrue(isCancelled());
+
+                    return svc.hello();
+                }
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer reduce(List<ComputeJobResult> results) {
+            assertEquals(1, results.size());
+
+            return results.get(0).getData();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 6233bab..a8d6e5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import java.util.Set;
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.ComputeJobCancelWithServiceSelfTest;
 import org.apache.ignite.internal.GridCommunicationSelfTest;
 import org.apache.ignite.internal.GridDiscoveryEventSelfTest;
 import org.apache.ignite.internal.GridDiscoverySelfTest;
@@ -114,6 +115,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteUpdateNotifierPerClusterSettingSelfTest.class);
         suite.addTestSuite(GridLocalEventListenerSelfTest.class);
         suite.addTestSuite(IgniteTopologyPrintFormatSelfTest.class);
+        suite.addTestSuite(ComputeJobCancelWithServiceSelfTest.class);
 
         // Managed Services.
         suite.addTestSuite(GridServiceProcessorSingleNodeSelfTest.class);


[2/4] ignite git commit: IGNITE-2796 NPE during rebalancing

Posted by vo...@apache.org.
 IGNITE-2796 NPE during rebalancing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69d1f4b7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69d1f4b7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69d1f4b7

Branch: refs/heads/ignite-1786
Commit: 69d1f4b7774769ca7334b2290b492c8e0f0a8ddb
Parents: a1efc5a
Author: Anton Vinogradov <av...@apache.org>
Authored: Wed Mar 16 13:33:26 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Mar 16 13:33:26 2016 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 29 ++++++++++++--------
 1 file changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/69d1f4b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 273b603..0cf974f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -841,6 +841,8 @@ public class GridDhtPartitionDemander {
          */
         private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
             synchronized (this) {
+                assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]";
+
                 remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
             }
         }
@@ -972,22 +974,25 @@ public class GridDhtPartitionDemander {
                     preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
                         exchFut.discoveryEvent());
 
-                Collection<Integer> parts = remaining.get(nodeId).get2();
+                T2<Long, Collection<Integer>> t = remaining.get(nodeId);
 
-                if (parts != null) {
-                    boolean rmvd = parts.remove(p);
+                assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId +
+                    ", part=" + p + "]";
 
-                    assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
-                        ", part=" + p + ", left=" + parts + "]";
+                Collection<Integer> parts = t.get2();
 
-                    if (parts.isEmpty()) {
-                        U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
-                            "rebalancing [cache=" + cctx.name() +
-                            ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
-                            ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+                boolean rmvd = parts.remove(p);
 
-                        remaining.remove(nodeId);
-                    }
+                assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
+                    ", part=" + p + ", left=" + parts + "]";
+
+                if (parts.isEmpty()) {
+                    U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
+                        "rebalancing [cache=" + cctx.name() +
+                        ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+                        ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]"));
+
+                    remaining.remove(nodeId);
                 }
 
                 checkIsDone();


[4/4] ignite git commit: Merge branch 'master' into ignite-1786

Posted by vo...@apache.org.
Merge branch 'master' into ignite-1786

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92918911
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92918911
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92918911

Branch: refs/heads/ignite-1786
Commit: 929189111dd2ee48fe1b3cf07ca11bb5642790d0
Parents: d208e44 d3420e6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 17 13:49:55 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 17 13:49:55 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   2 +-
 .../dht/preloader/GridDhtPartitionDemander.java |  29 ++--
 .../internal/processors/igfs/IgfsFileMap.java   |   9 +-
 .../igfs/IgfsFragmentizerManager.java           | 166 ++++++++++++++-----
 .../igfs/IgfsInvalidRangeException.java         |   4 +-
 .../processors/igfs/IgfsMetaManager.java        |  67 ++++----
 .../ComputeJobCancelWithServiceSelfTest.java    | 154 +++++++++++++++++
 .../igfs/IgfsMetaManagerSelfTest.java           |  11 --
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 9 files changed, 335 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/92918911/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index be3aee9,8df89f3..21fbc47
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@@ -850,7 -849,7 +850,8 @@@ public class IgniteKernal implements Ig
              startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
              startProcessor(new GridCacheProcessor(ctx));
              startProcessor(new GridQueryProcessor(ctx));
 +            startProcessor(new OdbcProcessor(ctx));
+             startProcessor(new GridServiceProcessor(ctx));
              startProcessor(new GridTaskSessionProcessor(ctx));
              startProcessor(new GridJobProcessor(ctx));
              startProcessor(new GridTaskProcessor(ctx));