You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/17 11:50:03 UTC
[1/4] ignite git commit: IGNITE-2846: IGFS: Reworked
IgfsMetaManager.updateInfo() operation to use "invoke" instead of "put".
Repository: ignite
Updated Branches:
refs/heads/ignite-1786 d208e44be -> 929189111
IGNITE-2846: IGFS: Reworked IgfsMetaManager.updateInfo() operation to use "invoke" instead of "put".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1efc5a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1efc5a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1efc5a0
Branch: refs/heads/ignite-1786
Commit: a1efc5a06b15acffa40ad0a9d3352206061b42f6
Parents: dfe5ea8
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Mar 16 13:22:07 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 16 13:22:07 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsFileMap.java | 9 +-
.../igfs/IgfsFragmentizerManager.java | 166 ++++++++++++++-----
.../igfs/IgfsInvalidRangeException.java | 4 +-
.../processors/igfs/IgfsMetaManager.java | 67 ++++----
.../igfs/IgfsMetaManagerSelfTest.java | 11 --
5 files changed, 161 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
index 2c0358b..9ea69ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFileMap.java
@@ -128,12 +128,11 @@ public class IgfsFileMap implements Externalizable {
*
* @param range Range to update status.
* @param status New range status.
- * @throws IgniteCheckedException If range was not found.
*/
- public void updateRangeStatus(IgfsFileAffinityRange range, int status) throws IgniteCheckedException {
+ public void updateRangeStatus(IgfsFileAffinityRange range, int status) {
if (ranges == null)
throw new IgfsInvalidRangeException("Failed to update range status (file map is empty) " +
- "[range=" + range + ", ranges=" + ranges + ']');
+ "[range=" + range + ", ranges=null]");
assert !ranges.isEmpty();
@@ -190,10 +189,10 @@ public class IgfsFileMap implements Externalizable {
*
* @param range Range to delete.
*/
- public void deleteRange(IgfsFileAffinityRange range) throws IgniteCheckedException {
+ public void deleteRange(IgfsFileAffinityRange range) {
if (ranges == null)
throw new IgfsInvalidRangeException("Failed to remove range (file map is empty) " +
- "[range=" + range + ", ranges=" + ranges + ']');
+ "[range=" + range + ", ranges=null]");
assert !ranges.isEmpty();
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 899730d..7cc5cb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.processors.igfs;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
@@ -41,19 +45,22 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
-import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -286,7 +293,8 @@ public class IgfsFragmentizerManager extends IgfsManager {
switch (range.status()) {
case RANGE_STATUS_INITIAL: {
// Mark range as moving.
- updated = igfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVING));
+ updated = igfsCtx.meta().updateInfo(
+ fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVING));
if (updated == null) {
igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -302,7 +310,8 @@ public class IgfsFragmentizerManager extends IgfsManager {
igfsCtx.data().spreadBlocks(fileInfo, range);
// Mark range as moved.
- updated = igfsCtx.meta().updateInfo(fileId, updateRange(range, RANGE_STATUS_MOVED));
+ updated = igfsCtx.meta().updateInfo(
+ fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVED));
if (updated == null) {
igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -318,7 +327,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
igfsCtx.data().cleanBlocks(fileInfo, range, false);
// Remove range from map.
- updated = igfsCtx.meta().updateInfo(fileId, deleteRange(range));
+ updated = igfsCtx.meta().updateInfo(fileId, new RangeDeleteProcessor(range));
if (updated == null)
igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -334,57 +343,132 @@ public class IgfsFragmentizerManager extends IgfsManager {
}
/**
- * Creates update info closure that will mark given range as moving.
- *
- * @param range Range to mark as moving.
- * @param status Status.
- * @return Update closure.
+ * Update range processor.
*/
- private IgniteClosure<IgfsFileInfo, IgfsFileInfo> updateRange(final IgfsFileAffinityRange range,
- final int status) {
- return new CX1<IgfsFileInfo, IgfsFileInfo>() {
- @Override public IgfsFileInfo applyx(IgfsFileInfo info) throws IgniteCheckedException {
- IgfsFileMap map = new IgfsFileMap(info.fileMap());
+ private static class RangeUpdateProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
- map.updateRangeStatus(range, status);
+ /** Range. */
+ private IgfsFileAffinityRange range;
- if (log.isDebugEnabled())
- log.debug("Updated file map for range [fileId=" + info.id() + ", range=" + range +
- ", status=" + status + ", oldMap=" + info.fileMap() + ", newMap=" + map + ']');
+ /** Status. */
+ private int status;
- IgfsFileInfo updated = new IgfsFileInfo(info, info.length());
+ /**
+ * Constructor.
+ */
+ public RangeUpdateProcessor() {
+ // No-op.
+ }
- updated.fileMap(map);
+ /**
+ * Constructor.
+ *
+ * @param range Range.
+ * @param status Status.
+ */
+ public RangeUpdateProcessor(IgfsFileAffinityRange range, int status) {
+ this.range = range;
+ this.status = status;
+ }
- return updated;
- }
- };
+ /** {@inheritDoc} */
+ @Override public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+ throws EntryProcessorException {
+ IgfsFileInfo oldInfo = entry.getValue();
+
+ IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
+
+ newMap.updateRangeStatus(range, status);
+
+ IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length());
+
+ newInfo.fileMap(newMap);
+
+ entry.setValue(newInfo);
+
+ return newInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(range);
+ out.writeInt(status);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ range = (IgfsFileAffinityRange)in.readObject();
+ status = in.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RangeUpdateProcessor.class, this);
+ }
}
/**
- * Creates update info closure that will mark given range as moving.
- *
- * @param range Range to mark as moving.
- * @return Update closure.
+ * Delete range processor.
*/
- private IgniteClosure<IgfsFileInfo, IgfsFileInfo> deleteRange(final IgfsFileAffinityRange range) {
- return new CX1<IgfsFileInfo, IgfsFileInfo>() {
- @Override public IgfsFileInfo applyx(IgfsFileInfo info) throws IgniteCheckedException {
- IgfsFileMap map = new IgfsFileMap(info.fileMap());
+ private static class RangeDeleteProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
- map.deleteRange(range);
+ /** Range. */
+ private IgfsFileAffinityRange range;
- if (log.isDebugEnabled())
- log.debug("Deleted range from file map [fileId=" + info.id() + ", range=" + range +
- ", oldMap=" + info.fileMap() + ", newMap=" + map + ']');
+ /**
+ * Constructor.
+ */
+ public RangeDeleteProcessor() {
+ // No-op.
+ }
- IgfsFileInfo updated = new IgfsFileInfo(info, info.length());
+ /**
+ * Constructor.
+ *
+ * @param range Range.
+ */
+ public RangeDeleteProcessor(IgfsFileAffinityRange range) {
+ this.range = range;
+ }
- updated.fileMap(map);
+ /** {@inheritDoc} */
+ @Override public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+ throws EntryProcessorException {
+ IgfsFileInfo oldInfo = entry.getValue();
- return updated;
- }
- };
+ IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
+
+ newMap.deleteRange(range);
+
+ IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length());
+
+ newInfo.fileMap(newMap);
+
+ entry.setValue(newInfo);
+
+ return newInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(range);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ range = (IgfsFileAffinityRange)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RangeDeleteProcessor.class, this);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java
index cd93278..d6ad2b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInvalidRangeException.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.processors.igfs;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
/**
* Internal exception thrown when attempted to update range that is no longer present
* in file affinity map.
*/
-public class IgfsInvalidRangeException extends IgniteCheckedException {
+public class IgfsInvalidRangeException extends IgniteException {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 2a85cf8..b4774f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1257,7 +1257,7 @@ public class IgfsMetaManager extends IgfsManager {
// Remove listing entries from root.
for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet())
- id2InfoPrj.invoke(IgfsUtils.ROOT_ID, new ListingRemove(entry.getKey(), entry.getValue().fileId()));
+ id2InfoPrj.invoke(IgfsUtils.ROOT_ID, new ListingRemoveProcessor(entry.getKey(), entry.getValue().fileId()));
resId = newInfo.id();
}
@@ -1408,7 +1408,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsListingEntry childEntry = parentInfo.listing().get(name);
if (childEntry != null)
- id2InfoPrj.invoke(parentId, new ListingRemove(name, id));
+ id2InfoPrj.invoke(parentId, new ListingRemoveProcessor(name, id));
id2InfoPrj.remove(id);
@@ -1584,20 +1584,20 @@ public class IgfsMetaManager extends IgfsManager {
* Update file info in cache.
*
* @param fileId File ID to update information for.
- * @param c Closure to update file's info inside transaction.
+ * @param proc Entry processor to invoke.
* @return Updated file info or {@code null} if such file ID not found.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public IgfsFileInfo updateInfo(IgniteUuid fileId, IgniteClosure<IgfsFileInfo, IgfsFileInfo> c)
- throws IgniteCheckedException {
+ @Nullable public IgfsFileInfo updateInfo(IgniteUuid fileId,
+ EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo> proc) throws IgniteCheckedException {
validTxState(false);
assert fileId != null;
- assert c != null;
+ assert proc != null;
if (busyLock.enterBusy()) {
try {
if (log.isDebugEnabled())
- log.debug("Update file info [fileId=" + fileId + ", c=" + c + ']');
+ log.debug("Update file info [fileId=" + fileId + ", proc=" + proc + ']');
IgniteInternalTx tx = startTx();
@@ -1608,27 +1608,21 @@ public class IgfsMetaManager extends IgfsManager {
if (oldInfo == null)
return null; // File not found.
- IgfsFileInfo newInfo = c.apply(oldInfo);
+ IgfsFileInfo newInfo = invokeAndGet(fileId, proc);
if (newInfo == null)
throw fsException("Failed to update file info with null value" +
- " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
+ " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
if (!oldInfo.id().equals(newInfo.id()))
throw fsException("Failed to update file info (file IDs differ)" +
- " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
+ " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
if (oldInfo.isDirectory() != newInfo.isDirectory())
throw fsException("Failed to update file info (file types differ)" +
- " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", c=" + c + ']');
-
- boolean b = id2InfoPrj.replace(fileId, oldInfo, newInfo);
-
- assert b : "Inconsistent transaction state [oldInfo=" + oldInfo + ", newInfo=" + newInfo +
- ", c=" + c + ']';
+ " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
- if (tx != null)
- tx.commit();
+ tx.commit();
return newInfo;
}
@@ -1636,8 +1630,7 @@ public class IgfsMetaManager extends IgfsManager {
throw U.cast(e);
}
finally {
- if (tx != null)
- tx.close();
+ tx.close();
}
}
finally {
@@ -1814,7 +1807,7 @@ public class IgfsMetaManager extends IgfsManager {
throw fsException("Failed to create new metadata entry due to ID conflict: " + info.id());
if (parentId != null)
- id2InfoPrj.invoke(parentId, new ListingAdd(name, new IgfsListingEntry(info)));
+ id2InfoPrj.invoke(parentId, new ListingAddProcessor(name, new IgfsListingEntry(info)));
}
/**
@@ -1831,8 +1824,8 @@ public class IgfsMetaManager extends IgfsManager {
IgniteUuid destId, String destName) throws IgniteCheckedException {
validTxState(true);
- id2InfoPrj.invoke(srcId, new ListingRemove(srcName, entry.fileId()));
- id2InfoPrj.invoke(destId, new ListingAdd(destName, entry));
+ id2InfoPrj.invoke(srcId, new ListingRemoveProcessor(srcName, entry.fileId()));
+ id2InfoPrj.invoke(destId, new ListingAddProcessor(destName, entry));
}
/**
@@ -1857,7 +1850,7 @@ public class IgfsMetaManager extends IgfsManager {
private void invokeUpdatePath(IgniteUuid id, IgfsPath path) throws IgniteCheckedException {
validTxState(true);
- id2InfoPrj.invoke(id, new UpdatePath(path));
+ id2InfoPrj.invoke(id, new UpdatePathProcessor(path));
}
/**
@@ -2009,7 +2002,7 @@ public class IgfsMetaManager extends IgfsManager {
id2InfoPrj.remove(oldId); // Remove the old one.
id2InfoPrj.invoke(parentInfo.id(),
- new ListingRemove(path.name(), parentInfo.listing().get(path.name()).fileId()));
+ new ListingRemoveProcessor(path.name(), parentInfo.listing().get(path.name()).fileId()));
createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
@@ -3101,7 +3094,7 @@ public class IgfsMetaManager extends IgfsManager {
* Remove entry from directory listing.
*/
@GridInternal
- private static final class ListingRemove implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ private static final class ListingRemoveProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -3115,7 +3108,7 @@ public class IgfsMetaManager extends IgfsManager {
/**
* Default constructor.
*/
- public ListingRemove() {
+ public ListingRemoveProcessor() {
// No-op.
}
@@ -3125,7 +3118,7 @@ public class IgfsMetaManager extends IgfsManager {
* @param fileName File name.
* @param fileId File ID.
*/
- public ListingRemove(String fileName, IgniteUuid fileId) {
+ public ListingRemoveProcessor(String fileName, IgniteUuid fileId) {
this.fileName = fileName;
this.fileId = fileId;
}
@@ -3173,7 +3166,7 @@ public class IgfsMetaManager extends IgfsManager {
* Update directory listing closure.
*/
@GridInternal
- private static final class ListingAdd implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ private static final class ListingAddProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -3190,7 +3183,7 @@ public class IgfsMetaManager extends IgfsManager {
* @param fileName File name to add into parent listing.
* @param entry Listing entry to add or remove.
*/
- private ListingAdd(String fileName, IgfsListingEntry entry) {
+ private ListingAddProcessor(String fileName, IgfsListingEntry entry) {
assert fileName != null;
assert entry != null;
@@ -3202,7 +3195,7 @@ public class IgfsMetaManager extends IgfsManager {
* Empty constructor required for {@link Externalizable}.
*
*/
- public ListingAdd() {
+ public ListingAddProcessor() {
// No-op.
}
@@ -3242,7 +3235,7 @@ public class IgfsMetaManager extends IgfsManager {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(ListingAdd.class, this);
+ return S.toString(ListingAddProcessor.class, this);
}
}
@@ -3250,7 +3243,7 @@ public class IgfsMetaManager extends IgfsManager {
* Update path closure.
*/
@GridInternal
- private static final class UpdatePath implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ private static final class UpdatePathProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -3261,14 +3254,14 @@ public class IgfsMetaManager extends IgfsManager {
/**
* @param path Path.
*/
- private UpdatePath(IgfsPath path) {
+ private UpdatePathProcessor(IgfsPath path) {
this.path = path;
}
/**
* Default constructor (required by Externalizable).
*/
- public UpdatePath() {
+ public UpdatePathProcessor() {
// No-op.
}
@@ -3293,7 +3286,7 @@ public class IgfsMetaManager extends IgfsManager {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(UpdatePath.class, this);
+ return S.toString(UpdatePathProcessor.class, this);
}
}
@@ -3665,7 +3658,7 @@ public class IgfsMetaManager extends IgfsManager {
leafParentId = parentId;
// Now link the newly created directory chain to the lowermost existing parent:
- id2InfoPrj.invoke(lowermostExistingId, new ListingAdd(childName, childInfo));
+ id2InfoPrj.invoke(lowermostExistingId, new ListingAddProcessor(childName, childInfo));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1efc5a0/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 72a2bee..19a91ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -352,17 +352,6 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertEmpty(mgr.directoryListing(b.id()));
- // Validate last actual data received from 'remove' operation.
- IgfsFileInfo newF2 = mgr.updateInfo(f2.id(), new C1<IgfsFileInfo, IgfsFileInfo>() {
- @Override public IgfsFileInfo apply(IgfsFileInfo e) {
- return new IgfsFileInfo(e, e.length() + 20);
- }
- });
-
- assertNotNull(newF2);
- assertEquals(f2.id(), newF2.id());
- assertNotSame(f2, newF2);
-
del = mgr.softDelete(path("/a/f2"), false);
assertEquals(f2.id(), del);
[3/4] ignite git commit: IGNITE-2853 - Fixed cancellation of the job
that depends on a service
Posted by vo...@apache.org.
IGNITE-2853 - Fixed cancellation of the job that depends on a service
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3420e6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3420e6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3420e6b
Branch: refs/heads/ignite-1786
Commit: d3420e6bc5e833a6eb1daaad25b11843f97328d5
Parents: 69d1f4b
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 16 22:21:24 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 16 22:21:24 2016 -0700
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../ComputeJobCancelWithServiceSelfTest.java | 154 +++++++++++++++++++
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
3 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 5d8daf6..8df89f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -849,6 +849,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
startProcessor(new GridCacheProcessor(ctx));
startProcessor(new GridQueryProcessor(ctx));
+ startProcessor(new GridServiceProcessor(ctx));
startProcessor(new GridTaskSessionProcessor(ctx));
startProcessor(new GridJobProcessor(ctx));
startProcessor(new GridTaskProcessor(ctx));
@@ -860,7 +861,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
- startProcessor(new GridServiceProcessor(ctx));
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
new file mode 100644
index 0000000..2718ed9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSplitAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test cancellation of a job that depends on service.
+ */
+public class ComputeJobCancelWithServiceSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJobCancel() throws Exception {
+ Ignite server = startGrid("server");
+
+ server.services().deployNodeSingleton("my-service", new MyService());
+
+ Ignition.setClientMode(true);
+
+ Ignite client = startGrid("client");
+
+ IgniteCompute compute = client.compute().withAsync();
+
+ compute.execute(new MyTask(), null);
+
+ ComputeTaskFuture<Integer> fut = compute.future();
+
+ Thread.sleep(3000);
+
+ server.close();
+
+ assertEquals(42, fut.get().intValue());
+ }
+
+ /** */
+ private static class MyService implements Service {
+ /** */
+ private volatile boolean cancelled;
+
+ /** {@inheritDoc} */
+ @Override public void init(ServiceContext ctx) throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ cancelled = true;
+ }
+
+ /**
+ * @return Response.
+ */
+ public int hello() {
+ assertFalse("Service already cancelled!", cancelled);
+
+ return 42;
+ }
+ }
+
+ /** */
+ private static class MyTask extends ComputeTaskSplitAdapter<Object, Integer> {
+ /** {@inheritDoc} */
+ @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) {
+ return Collections.singletonList(new ComputeJobAdapter() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override
+ public Object execute() throws IgniteException {
+ MyService svc = ignite.services().service("my-service");
+
+ while (!isCancelled()) {
+ try {
+ Thread.sleep(1000);
+
+ svc.hello();
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ }
+
+ assertTrue(isCancelled());
+
+ return svc.hello();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer reduce(List<ComputeJobResult> results) {
+ assertEquals(1, results.size());
+
+ return results.get(0).getData();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 6233bab..a8d6e5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import java.util.Set;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.ComputeJobCancelWithServiceSelfTest;
import org.apache.ignite.internal.GridCommunicationSelfTest;
import org.apache.ignite.internal.GridDiscoveryEventSelfTest;
import org.apache.ignite.internal.GridDiscoverySelfTest;
@@ -114,6 +115,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteUpdateNotifierPerClusterSettingSelfTest.class);
suite.addTestSuite(GridLocalEventListenerSelfTest.class);
suite.addTestSuite(IgniteTopologyPrintFormatSelfTest.class);
+ suite.addTestSuite(ComputeJobCancelWithServiceSelfTest.class);
// Managed Services.
suite.addTestSuite(GridServiceProcessorSingleNodeSelfTest.class);
[2/4] ignite git commit: IGNITE-2796 NPE during rebalancing
Posted by vo...@apache.org.
IGNITE-2796 NPE during rebalancing
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69d1f4b7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69d1f4b7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69d1f4b7
Branch: refs/heads/ignite-1786
Commit: 69d1f4b7774769ca7334b2290b492c8e0f0a8ddb
Parents: a1efc5a
Author: Anton Vinogradov <av...@apache.org>
Authored: Wed Mar 16 13:33:26 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Wed Mar 16 13:33:26 2016 +0300
----------------------------------------------------------------------
.../dht/preloader/GridDhtPartitionDemander.java | 29 ++++++++++++--------
1 file changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/69d1f4b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 273b603..0cf974f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -841,6 +841,8 @@ public class GridDhtPartitionDemander {
*/
private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
synchronized (this) {
+ assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]";
+
remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
}
}
@@ -972,22 +974,25 @@ public class GridDhtPartitionDemander {
preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
exchFut.discoveryEvent());
- Collection<Integer> parts = remaining.get(nodeId).get2();
+ T2<Long, Collection<Integer>> t = remaining.get(nodeId);
- if (parts != null) {
- boolean rmvd = parts.remove(p);
+ assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId +
+ ", part=" + p + "]";
- assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
- ", part=" + p + ", left=" + parts + "]";
+ Collection<Integer> parts = t.get2();
- if (parts.isEmpty()) {
- U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
- "rebalancing [cache=" + cctx.name() +
- ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
- ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
+ boolean rmvd = parts.remove(p);
- remaining.remove(nodeId);
- }
+ assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
+ ", part=" + p + ", left=" + parts + "]";
+
+ if (parts.isEmpty()) {
+ U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
+ "rebalancing [cache=" + cctx.name() +
+ ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
+ ", time=" + (U.currentTimeMillis() - t.get1()) + " ms]"));
+
+ remaining.remove(nodeId);
}
checkIsDone();
[4/4] ignite git commit: Merge branch 'master' into ignite-1786
Posted by vo...@apache.org.
Merge branch 'master' into ignite-1786
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92918911
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92918911
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92918911
Branch: refs/heads/ignite-1786
Commit: 929189111dd2ee48fe1b3cf07ca11bb5642790d0
Parents: d208e44 d3420e6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 17 13:49:55 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 17 13:49:55 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../dht/preloader/GridDhtPartitionDemander.java | 29 ++--
.../internal/processors/igfs/IgfsFileMap.java | 9 +-
.../igfs/IgfsFragmentizerManager.java | 166 ++++++++++++++-----
.../igfs/IgfsInvalidRangeException.java | 4 +-
.../processors/igfs/IgfsMetaManager.java | 67 ++++----
.../ComputeJobCancelWithServiceSelfTest.java | 154 +++++++++++++++++
.../igfs/IgfsMetaManagerSelfTest.java | 11 --
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
9 files changed, 335 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/92918911/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index be3aee9,8df89f3..21fbc47
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@@ -850,7 -849,7 +850,8 @@@ public class IgniteKernal implements Ig
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
startProcessor(new GridCacheProcessor(ctx));
startProcessor(new GridQueryProcessor(ctx));
+ startProcessor(new OdbcProcessor(ctx));
+ startProcessor(new GridServiceProcessor(ctx));
startProcessor(new GridTaskSessionProcessor(ctx));
startProcessor(new GridJobProcessor(ctx));
startProcessor(new GridTaskProcessor(ctx));