You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:40 UTC
[19/28] incubator-ignite git commit: ignite-545: merge from sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index db3d350..ed8e573 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -198,19 +198,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
// Remap regular mappings.
final Buffer buf = bufMappings.remove(id);
+ // Only async notification is possible since
+ // discovery thread may be trapped otherwise.
if (buf != null) {
- // Only async notification is possible since
- // discovery thread may be trapped otherwise.
- ctx.closure().callLocalSafe(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- buf.onNodeLeft();
-
- return null;
- }
- },
- true /* system pool */
- );
+ waitAffinityAndRun(new Runnable() {
+ @Override public void run() {
+ buf.onNodeLeft();
+ }
+ }, discoEvt.topologyVersion(), true);
}
}
};
@@ -248,6 +243,31 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
+ * @param c Closure to run.
+ * @param topVer Topology version to wait for.
+ * @param async Async flag.
+ */
+ private void waitAffinityAndRun(final Runnable c, long topVer, boolean async) {
+ AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer, 0);
+
+ IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer0);
+
+ if (fut != null && !fut.isDone()) {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ ctx.closure().runLocalSafe(c, true);
+ }
+ });
+ }
+ else {
+ if (async)
+ ctx.closure().runLocalSafe(c, true);
+ else
+ c.run();
+ }
+ }
+
+ /**
* @return Cache object context.
*/
public CacheObjectContext cacheObjectContext() {
@@ -527,6 +547,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
boolean initPda = ctx.deploy().enabled() && jobPda == null;
+ AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion();
+
for (DataStreamerEntry entry : entries) {
List<ClusterNode> nodes;
@@ -543,7 +565,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
initPda = false;
}
- nodes = nodes(key);
+ nodes = nodes(key, topVer);
}
catch (IgniteCheckedException e) {
resFut.onDone(e);
@@ -621,10 +643,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
};
- GridFutureAdapter<?> f;
+ final GridFutureAdapter<?> f;
try {
- f = buf.update(entriesForNode, lsnr);
+ f = buf.update(entriesForNode, topVer, lsnr);
}
catch (IgniteInterruptedCheckedException e1) {
resFut.onDone(e1);
@@ -633,30 +655,38 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
if (ctx.discovery().node(nodeId) == null) {
- if (bufMappings.remove(nodeId, buf))
- buf.onNodeLeft();
+ if (bufMappings.remove(nodeId, buf)) {
+ final Buffer buf0 = buf;
+
+ waitAffinityAndRun(new Runnable() {
+ @Override public void run() {
+ buf0.onNodeLeft();
- if (f != null)
- f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + nodeId));
+ if (f != null)
+ f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+ "(node has left): " + nodeId));
+ }
+ }, ctx.discovery().topologyVersion(), false);
+ }
}
}
}
/**
* @param key Key to map.
+ * @param topVer Topology version.
* @return Nodes to send requests to.
* @throws IgniteCheckedException If failed.
*/
- private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException {
+ private List<ClusterNode> nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException {
GridAffinityProcessor aff = ctx.affinity();
List<ClusterNode> res = null;
if (!allowOverwrite())
- res = aff.mapKeyToPrimaryAndBackups(cacheName, key);
+ res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer);
else {
- ClusterNode node = aff.mapKeyToNode(cacheName, key);
+ ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer);
if (node != null)
res = Collections.singletonList(node);
@@ -959,11 +989,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/**
* @param newEntries Infos.
+ * @param topVer Topology version.
* @param lsnr Listener for the operation future.
* @throws IgniteInterruptedCheckedException If failed.
* @return Future for operation.
*/
@Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
+ AffinityTopologyVersion topVer,
IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
List<DataStreamerEntry> entries0 = null;
GridFutureAdapter<Object> curFut0;
@@ -986,7 +1018,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
if (entries0 != null) {
- submit(entries0, curFut0);
+ submit(entries0, topVer, curFut0);
if (cancelled)
curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this));
@@ -1023,7 +1055,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
if (entries0 != null)
- submit(entries0, curFut0);
+ submit(entries0, null, curFut0);
// Create compound future for this flush.
GridCompoundFuture<Object, Object> res = null;
@@ -1068,10 +1100,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/**
* @param entries Entries to submit.
+ * @param topVer Topology version.
* @param curFut Current future.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private void submit(final Collection<DataStreamerEntry> entries, final GridFutureAdapter<Object> curFut)
+ private void submit(final Collection<DataStreamerEntry> entries,
+ @Nullable AffinityTopologyVersion topVer,
+ final GridFutureAdapter<Object> curFut)
throws IgniteInterruptedCheckedException {
assert entries != null;
assert !entries.isEmpty();
@@ -1160,6 +1195,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+ if (topVer == null)
+ topVer = ctx.cache().context().exchange().readyAffinityVersion();
+
DataStreamerRequest req = new DataStreamerRequest(
reqId,
topicBytes,
@@ -1174,7 +1212,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
dep != null ? dep.participants() : null,
dep != null ? dep.classLoaderId() : null,
dep == null,
- ctx.cache().context().exchange().readyAffinityVersion());
+ topVer);
try {
ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
@@ -1422,6 +1460,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
cctx.evicts().touch(entry, topVer);
CU.unwindEvicts(cctx);
+
+ entry.onUnlock();
}
catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
// No-op.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 72911af..aa3bfe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.datastructures;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
@@ -32,6 +33,7 @@ import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
+import javax.cache.event.*;
import javax.cache.processor.*;
import java.io.*;
import java.util.*;
@@ -40,7 +42,6 @@ import java.util.concurrent.*;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
import static org.apache.ignite.cache.CacheRebalanceMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;
@@ -99,6 +100,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** */
private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache;
+ /** */
+ private volatile UUID qryId;
+
/**
* @param ctx Context.
*/
@@ -112,7 +116,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Override public void onKernalStart() {
+ @Override public void onKernalStart() throws IgniteCheckedException {
if (ctx.config().isDaemon())
return;
@@ -139,11 +143,35 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
seqView = atomicsCache;
- dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context();
+ dsCacheCtx = atomicsCache.context();
}
}
/**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void startQuery() throws IgniteCheckedException {
+ if (qryId == null) {
+ synchronized (this) {
+ if (qryId == null) {
+ qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
+ new DataStructuresEntryFilter(),
+ dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+ false);
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ super.onKernalStop(cancel);
+
+ if (qryId != null)
+ dsCacheCtx.continuousQueries().cancelInternalQuery(qryId);
+ }
+
+ /**
* Gets a sequence from cache or creates one if it's not cached.
*
* @param name Sequence name.
@@ -161,6 +189,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() {
@Override public IgniteAtomicSequence applyx() throws IgniteCheckedException {
GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -287,6 +317,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() {
@Override public IgniteAtomicLong applyx() throws IgniteCheckedException {
final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -490,6 +522,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() {
@Override public IgniteAtomicReference<T> applyx() throws IgniteCheckedException {
GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -591,6 +625,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() {
@Override public IgniteAtomicStamped<T, S> applyx() throws IgniteCheckedException {
GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name);
@@ -899,6 +935,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() {
@Override public IgniteCountDownLatch applyx() throws IgniteCheckedException {
GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -906,8 +944,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx.gate().enter();
try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheCountDownLatchValue val = cast(dsView.get(key),
- GridCacheCountDownLatchValue.class);
+ GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class);
// Check that count down hasn't been created in other thread yet.
GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class);
@@ -1034,28 +1071,46 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/**
- * Transaction committed callback for transaction manager.
*
- * @param tx Committed transaction.
*/
- public <K, V> void onTxCommitted(IgniteInternalTx tx) {
- if (dsCacheCtx == null)
- return;
+ static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
- if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) {
- Collection<IgniteTxEntry> entries = tx.writeEntries();
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+ if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
+ return evt.getValue() instanceof GridCacheCountDownLatchValue;
+ else {
+ assert evt.getEventType() == EventType.REMOVED : evt;
- if (log.isDebugEnabled())
- log.debug("Committed entries: " + entries);
+ return true;
+ }
+ }
- for (IgniteTxEntry entry : entries) {
- // Check updated or created GridCacheInternalKey keys.
- if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key().internal()) {
- GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataStructuresEntryFilter.class, this);
+ }
+ }
- Object val0 = CU.value(entry.value(), entry.context(), false);
+ /**
+ *
+ */
+ private class DataStructuresEntryListener implements
+ CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+ /** {@inheritDoc} */
+ @Override public void onUpdated(
+ Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
+ throws CacheEntryListenerException
+ {
+ for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
+ if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
+ GridCacheInternal val0 = evt.getValue();
if (val0 instanceof GridCacheCountDownLatchValue) {
+ GridCacheInternalKey key = evt.getKey();
+
// Notify latch on changes.
GridCacheRemovable latch = dsMap.get(key);
@@ -1067,8 +1122,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
latch0.onUpdate(val.get());
if (val.get() == 0 && val.autoDelete()) {
- entry.cached().markObsolete(dsCacheCtx.versions().next());
-
dsMap.remove(key);
latch.onRemoved();
@@ -1080,11 +1133,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
", actual=" + latch.getClass() + ", value=" + latch + ']');
}
}
+
}
+ else {
+ assert evt.getEventType() == EventType.REMOVED : evt;
- // Check deleted GridCacheInternal keys.
- if (entry.op() == DELETE && entry.key().internal()) {
- GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
+ GridCacheInternal key = evt.getKey();
// Entry's val is null if entry deleted.
GridCacheRemovable obj = dsMap.remove(key);
@@ -1094,6 +1148,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
}
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataStructuresEntryListener.class, this);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
index 65cb48d..5fd6c81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
@@ -98,5 +98,5 @@ public interface HadoopJob {
/**
* Cleans up the job staging directory.
*/
- void cleanupStagingDirectory();
+ public void cleanupStagingDirectory();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 371fd81..3d2ee17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -21,13 +21,14 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.processors.hadoop.counter.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* Task context.
*/
public abstract class HadoopTaskContext {
/** */
- private final HadoopJob job;
+ protected final HadoopJob job;
/** */
private HadoopTaskInput input;
@@ -187,4 +188,15 @@ public abstract class HadoopTaskContext {
* @throws IgniteCheckedException If failed.
*/
public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+
+ /**
+ * Executes a callable on behalf of the job owner.
+ * In case of embedded task execution the implementation of this method
+ * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with.
+ * @param c The callable.
+ * @param <T> The return type of the Callable.
+ * @return The result of the callable.
+ * @throws IgniteCheckedException On any error in callable.
+ */
+ public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 7c1a837..361f75f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem {
/** Property name for URI of file system. */
public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
- /** Property name for user name of file system. */
- public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
+ /** Property name for default user name of file system.
+ * NOTE: for secondary file system this is just a default user name, which is used
+ * when the 2ndary filesystem is used outside of any user context.
+ * If another user name is set in the context, 2ndary file system will work on behalf
+ * of that user, which is different from the default. */
+ public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
/**
* Stops IGFS cleaning all used resources.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 34636d2..c3495e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -245,8 +245,12 @@ public final class IgfsImpl implements IgfsEx {
for (IgfsFileWorkerBatch batch : workerMap.values())
batch.cancel();
- if (secondaryFs instanceof AutoCloseable)
- U.closeQuiet((AutoCloseable)secondaryFs);
+ try {
+ secondaryFs.close();
+ }
+ catch (Exception e) {
+ log.error("Failed to close secondary file system.", e);
+ }
}
igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 8a8b858..cfe6ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
private final int bufSize; // Buffer size. Must not be less then file block size.
/** IGFS instance for this handler. */
- private IgfsEx igfs;
+ private final IgfsEx igfs;
/** Resource ID generator. */
- private AtomicLong rsrcIdGen = new AtomicLong();
+ private final AtomicLong rsrcIdGen = new AtomicLong();
/** Stopping flag. */
private volatile boolean stopping;
@@ -241,138 +241,148 @@ class IgfsIpcHandler implements IgfsServerHandler {
* @return Response message.
* @throws IgniteCheckedException If failed.
*/
- private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd,
+ private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd,
IgfsMessage msg) throws IgniteCheckedException {
- IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+ final IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
if (log.isDebugEnabled())
log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']');
- IgfsControlResponse res = new IgfsControlResponse();
+ final IgfsControlResponse res = new IgfsControlResponse();
+
+ final String userName = req.userName();
+
+ assert userName != null;
try {
- switch (cmd) {
- case EXISTS:
- res.response(igfs.exists(req.path()));
+ IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() {
+ @Override public Void apply() {
+ switch (cmd) {
+ case EXISTS:
+ res.response(igfs.exists(req.path()));
- break;
+ break;
- case INFO:
- res.response(igfs.info(req.path()));
+ case INFO:
+ res.response(igfs.info(req.path()));
- break;
+ break;
- case PATH_SUMMARY:
- res.response(igfs.summary(req.path()));
+ case PATH_SUMMARY:
+ res.response(igfs.summary(req.path()));
- break;
+ break;
- case UPDATE:
- res.response(igfs.update(req.path(), req.properties()));
+ case UPDATE:
+ res.response(igfs.update(req.path(), req.properties()));
- break;
+ break;
- case RENAME:
- igfs.rename(req.path(), req.destinationPath());
+ case RENAME:
+ igfs.rename(req.path(), req.destinationPath());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case DELETE:
- res.response(igfs.delete(req.path(), req.flag()));
+ case DELETE:
+ res.response(igfs.delete(req.path(), req.flag()));
- break;
+ break;
- case MAKE_DIRECTORIES:
- igfs.mkdirs(req.path(), req.properties());
+ case MAKE_DIRECTORIES:
+ igfs.mkdirs(req.path(), req.properties());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case LIST_PATHS:
- res.paths(igfs.listPaths(req.path()));
+ case LIST_PATHS:
+ res.paths(igfs.listPaths(req.path()));
- break;
+ break;
- case LIST_FILES:
- res.files(igfs.listFiles(req.path()));
+ case LIST_FILES:
+ res.files(igfs.listFiles(req.path()));
- break;
+ break;
- case SET_TIMES:
- igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+ case SET_TIMES:
+ igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
- res.response(true);
+ res.response(true);
- break;
+ break;
- case AFFINITY:
- res.locations(igfs.affinity(req.path(), req.start(), req.length()));
+ case AFFINITY:
+ res.locations(igfs.affinity(req.path(), req.start(), req.length()));
- break;
+ break;
- case OPEN_READ: {
- IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
- igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
+ case OPEN_READ: {
+ IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
+ igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
- long streamId = registerResource(ses, igfsIn);
+ long streamId = registerResource(ses, igfsIn);
- if (log.isDebugEnabled())
- log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
- igfsIn.fileInfo().modificationTime());
+ IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
+ igfsIn.fileInfo().modificationTime());
- res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
+ res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
- break;
- }
+ break;
+ }
- case OPEN_CREATE: {
- long streamId = registerResource(ses, igfs.create(
- req.path(), // Path.
- bufSize, // Buffer size.
- req.flag(), // Overwrite if exists.
- affinityKey(req), // Affinity key based on replication factor.
- req.replication(),// Replication factor.
- req.blockSize(), // Block size.
- req.properties() // File properties.
- ));
+ case OPEN_CREATE: {
+ long streamId = registerResource(ses, igfs.create(
+ req.path(), // Path.
+ bufSize, // Buffer size.
+ req.flag(), // Overwrite if exists.
+ affinityKey(req), // Affinity key based on replication factor.
+ req.replication(),// Replication factor.
+ req.blockSize(), // Block size.
+ req.properties() // File properties.
+ ));
- if (log.isDebugEnabled())
- log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(streamId);
+ res.response(streamId);
- break;
- }
+ break;
+ }
- case OPEN_APPEND: {
- long streamId = registerResource(ses, igfs.append(
- req.path(), // Path.
- bufSize, // Buffer size.
- req.flag(), // Create if absent.
- req.properties() // File properties.
- ));
+ case OPEN_APPEND: {
+ long streamId = registerResource(ses, igfs.append(
+ req.path(), // Path.
+ bufSize, // Buffer size.
+ req.flag(), // Create if absent.
+ req.properties() // File properties.
+ ));
- if (log.isDebugEnabled())
- log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
- req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+ if (log.isDebugEnabled())
+ log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
+ req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(streamId);
+ res.response(streamId);
- break;
- }
+ break;
+ }
- default:
- assert false : "Unhandled path control request command: " + cmd;
+ default:
+ assert false : "Unhandled path control request command: " + cmd;
- break;
- }
+ break;
+ }
+
+ return null;
+ }
+ });
}
catch (IgniteException e) {
throw new IgniteCheckedException(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index e33e0d4..b98c5d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -669,7 +669,7 @@ public class IgfsMetaManager extends IgfsManager {
private Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx) throws IgniteCheckedException {
assert fileId != null;
- IgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singletonList(fileId)).get(fileId) :
+ IgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singleton(fileId)).get(fileId) :
id2InfoPrj.get(fileId);
return info == null ? Collections.<String, IgfsListingEntry>emptyMap() : info.listing();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 683b317..44ee90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -30,14 +30,14 @@ import java.util.*;
*/
class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
/** Delegate. */
- private final IgfsImpl igfs;
+ private final IgfsEx igfs;
/**
* Constructor.
*
* @param igfs Delegate.
*/
- IgfsSecondaryFileSystemImpl(IgfsImpl igfs) {
+ IgfsSecondaryFileSystemImpl(IgfsEx igfs) {
this.igfs = igfs;
}
@@ -118,4 +118,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
@Override public Map<String, String> properties() {
return Collections.emptyMap();
}
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 253d5be..caa6866 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -239,13 +239,13 @@ public class IgfsServer {
*/
private class ClientWorker extends GridWorker {
/** Connected client endpoint. */
- private IpcEndpoint endpoint;
+ private final IpcEndpoint endpoint;
/** Data output stream. */
private final IgfsDataOutputStream out;
/** Client session object. */
- private IgfsClientSession ses;
+ private final IgfsClientSession ses;
/** Queue node for fast unlink. */
private ConcurrentLinkedDeque8.Node<ClientWorker> node;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 4b0234f..8026a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
import java.lang.reflect.*;
@@ -88,4 +90,18 @@ public class IgfsUtils {
private IgfsUtils() {
// No-op.
}
+
+ /**
+ * Provides non-null user name.
+ * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
+ * which is the current process owner user.
+ * @param user a user name to be fixed.
+ * @return non-null interned user name.
+ */
+ public static String fixUserName(@Nullable String user) {
+ if (F.isEmpty(user))
+ user = FileSystemConfiguration.DFLT_USER_NAME;
+
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index cd4d543..ed8e1e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -121,7 +121,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (F.isEmpty(meta.getValueType()))
throw new IgniteCheckedException("Value type is not set: " + meta);
- TypeDescriptor desc = new TypeDescriptor(ccfg);
+ TypeDescriptor desc = new TypeDescriptor();
Class<?> valCls = U.classForName(meta.getValueType(), null);
@@ -160,7 +160,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
Class<?> keyCls = clss[i];
Class<?> valCls = clss[i + 1];
- TypeDescriptor desc = processKeyAndValueClasses(ccfg, keyCls, valCls);
+ TypeDescriptor desc = processKeyAndValueClasses(keyCls, valCls);
addTypeByName(ccfg, desc);
types.put(new TypeId(ccfg.getName(), valCls), desc);
@@ -188,15 +188,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
- * @param ccfg Cache configuration.
* @param keyCls Key class.
* @param valCls Value class.
* @return Type descriptor.
* @throws IgniteCheckedException If failed.
*/
- private TypeDescriptor processKeyAndValueClasses(CacheConfiguration<?,?> ccfg, Class<?> keyCls, Class<?> valCls)
+ private TypeDescriptor processKeyAndValueClasses(
+ Class<?> keyCls,
+ Class<?> valCls
+ )
throws IgniteCheckedException {
- TypeDescriptor d = new TypeDescriptor(ccfg);
+ TypeDescriptor d = new TypeDescriptor();
d.keyClass(keyCls);
d.valueClass(valCls);
@@ -318,7 +320,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
try {
- return rebuildIndexes(space, typesByName.get(new TypeName(space, valTypeName)));
+ return rebuildIndexes(
+ space,
+ typesByName.get(
+ new TypeName(
+ space,
+ valTypeName)));
}
finally {
busyLock.leaveBusy();
@@ -539,7 +546,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- return idx.queryTwoStep(ctx.cache().internalCache(space).context(), qry);
+ return idx.queryTwoStep(
+ ctx.cache().internalCache(space).context(),
+ qry);
}
finally {
busyLock.leaveBusy();
@@ -589,59 +598,62 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param qry Query.
* @return Cursor.
*/
- public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(GridCacheContext<?,?> cctx, SqlQuery qry) {
+ public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(final GridCacheContext<?, ?> cctx, final SqlQuery qry) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- String space = cctx.name();
- String type = qry.getType();
- String sqlQry = qry.getSql();
- Object[] params = qry.getArgs();
-
- TypeDescriptor typeDesc = typesByName.get(new TypeName(space, type));
-
- if (typeDesc == null || !typeDesc.registered())
- throw new CacheException("Failed to find SQL table for type: " + type);
-
- final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc,
- idx.backupFilter());
-
- if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
- ctx.event().record(new CacheQueryExecutedEvent<>(
- ctx.discovery().localNode(),
- "SQL query executed.",
- EVT_CACHE_QUERY_EXECUTED,
- CacheQueryType.SQL.name(),
- null,
- null,
- sqlQry,
- null,
- null,
- params,
- null,
- null));
- }
-
- return new ClIter<Cache.Entry<K,V>>() {
- @Override public void close() throws Exception {
- i.close();
- }
-
- @Override public boolean hasNext() {
- return i.hasNext();
- }
-
- @Override public Cache.Entry<K,V> next() {
- IgniteBiTuple<K,V> t = i.next();
-
- return new CacheEntryImpl<>(t.getKey(), t.getValue());
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ return executeQuery(
+ cctx,
+ new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() {
+ @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
+ String space = cctx.name();
+ String type = qry.getType();
+ String sqlQry = qry.getSql();
+ Object[] params = qry.getArgs();
+
+ TypeDescriptor typeDesc = typesByName.get(
+ new TypeName(
+ space,
+ type));
+
+ if (typeDesc == null || !typeDesc.registered())
+ throw new CacheException("Failed to find SQL table for type: " + type);
+
+ final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.query(
+ space,
+ sqlQry,
+ F.asList(params),
+ typeDesc,
+ idx.backupFilter());
+
+ sendQueryExecutedEvent(
+ sqlQry,
+ params);
+
+ return new ClIter<Cache.Entry<K, V>>() {
+ @Override public void close() throws Exception {
+ i.close();
+ }
+
+ @Override public boolean hasNext() {
+ return i.hasNext();
+ }
+
+ @Override public Cache.Entry<K, V> next() {
+ IgniteBiTuple<K, V> t = i.next();
+
+ return new CacheEntryImpl<>(
+ t.getKey(),
+ t.getValue());
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ });
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -652,6 +664,28 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param sqlQry Sql query.
+ * @param params Params.
+ */
+ private void sendQueryExecutedEvent(String sqlQry, Object[] params) {
+ if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+ ctx.event().record(new CacheQueryExecutedEvent<>(
+ ctx.discovery().localNode(),
+ "SQL query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.SQL.name(),
+ null,
+ null,
+ sqlQry,
+ null,
+ null,
+ params,
+ null,
+ null));
+ }
+ }
+
+ /**
* @return Message factory for {@link GridIoManager}.
*/
public MessageFactory messageFactory() {
@@ -670,39 +704,29 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param qry Query.
* @return Iterator.
*/
- public QueryCursor<List<?>> queryLocalFields(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) {
+ public QueryCursor<List<?>> queryLocalFields(final GridCacheContext<?,?> cctx, final SqlFieldsQuery qry) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");
try {
- String space = cctx.name();
- String sql = qry.getSql();
- Object[] args = qry.getArgs();
-
- GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
-
- if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
- ctx.event().record(new CacheQueryExecutedEvent<>(
- ctx.discovery().localNode(),
- "SQL query executed.",
- EVT_CACHE_QUERY_EXECUTED,
- CacheQueryType.SQL.name(),
- null,
- null,
- sql,
- null,
- null,
- args,
- null,
- null));
- }
+ return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
+ @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
+ String space = cctx.name();
+ String sql = qry.getSql();
+ Object[] args = qry.getArgs();
- QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
- new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()));
+ GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
- cursor.fieldsMeta(res.metaData());
+ sendQueryExecutedEvent(sql, args);
- return cursor;
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
+ new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()));
+
+ cursor.fieldsMeta(res.metaData());
+
+ return cursor;
+ }
+ });
}
catch (IgniteCheckedException e) {
throw new CacheException(e);
@@ -793,7 +817,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (type == null || !type.registered())
throw new CacheException("Failed to find SQL table for type: " + resType);
- return idx.queryText(space, clause, type, filters);
+ return idx.queryText(
+ space,
+ clause,
+ type,
+ filters);
}
finally {
busyLock.leaveBusy();
@@ -808,7 +836,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Field rows.
* @throws IgniteCheckedException If failed.
*/
- public <K, V> GridQueryFieldsResult queryFields(@Nullable String space, String clause, Collection<Object> params,
+ public GridQueryFieldsResult queryFields(@Nullable String space, String clause, Collection<Object> params,
IndexingQueryFilter filters) throws IgniteCheckedException {
checkEnabled();
@@ -837,7 +865,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (ctx.indexing().enabled()) {
CacheObjectContext coctx = cacheObjectContext(spaceName);
- ctx.indexing().onSwap(spaceName, key.value(coctx, false));
+ ctx.indexing().onSwap(
+ spaceName,
+ key.value(
+ coctx,
+ false));
}
if (idx == null)
@@ -847,7 +879,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to process swap event (grid is stopping).");
try {
- idx.onSwap(spaceName, key);
+ idx.onSwap(
+ spaceName,
+ key);
}
finally {
busyLock.leaveBusy();
@@ -1067,7 +1101,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
assert valCls != null;
for (Map.Entry<String, Class<?>> entry : meta.getAscendingFields().entrySet()) {
- ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue());
+ ClassProperty prop = buildClassProperty(
+ keyCls,
+ valCls,
+ entry.getKey(),
+ entry.getValue());
d.addProperty(prop, false);
@@ -1079,7 +1117,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
for (Map.Entry<String, Class<?>> entry : meta.getDescendingFields().entrySet()) {
- ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue());
+ ClassProperty prop = buildClassProperty(
+ keyCls,
+ valCls,
+ entry.getKey(),
+ entry.getValue());
d.addProperty(prop, false);
@@ -1091,7 +1133,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
for (String txtIdx : meta.getTextFields()) {
- ClassProperty prop = buildClassProperty(keyCls, valCls, txtIdx, String.class);
+ ClassProperty prop = buildClassProperty(
+ keyCls,
+ valCls,
+ txtIdx,
+ String.class);
d.addProperty(prop, false);
@@ -1109,7 +1155,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
int order = 0;
for (Map.Entry<String, IgniteBiTuple<Class<?>, Boolean>> idxField : idxFields.entrySet()) {
- ClassProperty prop = buildClassProperty(keyCls, valCls, idxField.getKey(), idxField.getValue().get1());
+ ClassProperty prop = buildClassProperty(
+ keyCls,
+ valCls,
+ idxField.getKey(),
+ idxField.getValue().get1());
d.addProperty(prop, false);
@@ -1123,7 +1173,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
for (Map.Entry<String, Class<?>> entry : meta.getQueryFields().entrySet()) {
- ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue());
+ ClassProperty prop = buildClassProperty(
+ keyCls,
+ valCls,
+ entry.getKey(),
+ entry.getValue());
d.addProperty(prop, false);
}
@@ -1231,7 +1285,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
*/
private static ClassProperty buildClassProperty(Class<?> keyCls, Class<?> valCls, String pathStr, Class<?> resType)
throws IgniteCheckedException {
- ClassProperty res = buildClassProperty(true, keyCls, pathStr, resType);
+ ClassProperty res = buildClassProperty(
+ true,
+ keyCls,
+ pathStr,
+ resType);
if (res == null) // We check key before value consistently with PortableProperty.
res = buildClassProperty(false, valCls, pathStr, resType);
@@ -1330,6 +1388,59 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param cctx Cache context.
+ * @param clo Closure.
+ */
+ private <R> R executeQuery(GridCacheContext<?,?> cctx, IgniteOutClosureX<R> clo)
+ throws IgniteCheckedException {
+ final long start = U.currentTimeMillis();
+
+ Throwable err = null;
+
+ R res = null;
+
+ try {
+ res = clo.apply();
+
+ return res;
+ }
+ catch (GridClosureException e) {
+ err = e.unwrap();
+
+ throw (IgniteCheckedException)err;
+ }
+ finally {
+ GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics();
+
+ onExecuted(cctx, metrics, res, err, start, U.currentTimeMillis() - start, log);
+ }
+ }
+
+ /**
+ * @param cctx Cctx.
+ * @param metrics Metrics.
+ * @param res Result.
+ * @param err Err.
+ * @param startTime Start time.
+ * @param duration Duration.
+ * @param log Logger.
+ */
+ public static void onExecuted(GridCacheContext<?, ?> cctx, GridCacheQueryMetricsAdapter metrics,
+ Object res, Throwable err, long startTime, long duration, IgniteLogger log) {
+ boolean fail = err != null;
+
+ // Update own metrics.
+ metrics.onQueryExecute(duration, fail);
+
+ // Update metrics in query manager.
+ cctx.queries().onMetricsUpdate(duration, fail);
+
+ if (log.isTraceEnabled())
+ log.trace("Query execution finished [startTime=" + startTime +
+ ", duration=" + duration + ", fail=" + (err != null) + ", res=" + res + ']');
+ }
+
+ /**
*
*/
private abstract static class Property {
@@ -1538,9 +1649,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
*/
private static class TypeDescriptor implements GridQueryTypeDescriptor {
/** */
- private CacheConfiguration<?,?> ccfg;
-
- /** */
private String name;
/** Value field names and types with preserved order. */
@@ -1571,13 +1679,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
private boolean registered;
/**
- * @param ccfg Cache configuration.
- */
- private TypeDescriptor(CacheConfiguration<?,?> ccfg) {
- this.ccfg = ccfg;
- }
-
- /**
* @return {@code True} if type registration in SPI was finished and type was not rejected.
*/
boolean registered() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 22d1ff0..64eb1c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
@@ -69,7 +70,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap8<>();
/** Deployment executor service. */
- private final ExecutorService depExe = Executors.newSingleThreadExecutor();
+ private final ExecutorService depExe;
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -97,6 +98,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
*/
public GridServiceProcessor(GridKernalContext ctx) {
super(ctx);
+
+ depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.gridName(), "srvc-deploy"));
}
/** {@inheritDoc} */
@@ -128,10 +131,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
ctx.cache().context().deploy().ignoreOwnership(true);
cfgQryId = cache.context().continuousQueries().executeInternalQuery(
- new DeploymentListener(), null, true, true);
+ new DeploymentListener(), null, cache.context().affinityNode(), true);
assignQryId = cache.context().continuousQueries().executeInternalQuery(
- new AssignmentListener(), null, true, true);
+ new AssignmentListener(), null, cache.context().affinityNode(), true);
}
finally {
if (ctx.deploy().enabled())
@@ -345,7 +348,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
"different configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']'));
}
else {
- for (Cache.Entry<Object, Object> e : cache.entrySetx()) {
+ Iterator<Cache.Entry<Object, Object>> it = serviceEntries(
+ ServiceAssignmentsPredicate.INSTANCE);
+
+ while (it.hasNext()) {
+ Cache.Entry<Object, Object> e = it.next();
+
if (e.getKey() instanceof GridServiceAssignmentsKey) {
GridServiceAssignments assigns = (GridServiceAssignments)e.getValue();
@@ -437,7 +445,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
public IgniteInternalFuture<?> cancelAll() {
Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
- for (Cache.Entry<Object, Object> e : cache.entrySetx()) {
+ Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
+
+ while (it.hasNext()) {
+ Cache.Entry<Object, Object> e = it.next();
+
if (!(e.getKey() instanceof GridServiceDeploymentKey))
continue;
@@ -456,7 +468,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
public Collection<ServiceDescriptor> serviceDescriptors() {
Collection<ServiceDescriptor> descs = new ArrayList<>();
- for (Cache.Entry<Object, Object> e : cache.entrySetx()) {
+ Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
+
+ while (it.hasNext()) {
+ Cache.Entry<Object, Object> e = it.next();
+
if (!(e.getKey() instanceof GridServiceDeploymentKey))
continue;
@@ -904,6 +920,43 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
/**
+ * @param p Entry predicate used to execute query from client node.
+ * @return Service deployment entries.
+ */
+ @SuppressWarnings("unchecked")
+ private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) {
+ if (!cache.context().affinityNode()) {
+ ClusterNode oldestSrvNode =
+ CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
+
+ if (oldestSrvNode == null)
+ return F.emptyIterator();
+
+ GridCacheQueryManager qryMgr = cache.context().queries();
+
+ CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, false);
+
+ qry.keepAll(false);
+
+ qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
+
+ return cache.context().itHolder().iterator(qry.execute(),
+ new CacheIteratorConverter<Object, Map.Entry<Object,Object>>() {
+ @Override protected Object convert(Map.Entry<Object, Object> e) {
+ return new CacheEntryImpl<>(e.getKey(), e.getValue());
+ }
+
+ @Override protected void remove(Object item) {
+ throw new UnsupportedOperationException();
+ }
+ }
+ );
+ }
+ else
+ return cache.entrySetx().iterator();
+ }
+
+ /**
* Service deployment listener.
*/
private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> {
@@ -1045,18 +1098,24 @@ public class GridServiceProcessor extends GridProcessorAdapter {
try {
depExe.submit(new BusyRunnable() {
@Override public void run0() {
- long topVer = ((DiscoveryEvent)evt).topologyVersion();
+ AffinityTopologyVersion topVer =
+ new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion());
- ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
- if (oldest.isLocal()) {
+ if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
if (ctx.deploy().enabled())
ctx.cache().context().deploy().ignoreOwnership(true);
try {
- for (Cache.Entry<Object, Object> e : cache.entrySetx()) {
+ Iterator<Cache.Entry<Object, Object>> it = serviceEntries(
+ ServiceDeploymentPredicate.INSTANCE);
+
+ while (it.hasNext()) {
+ Cache.Entry<Object, Object> e = it.next();
+
if (!(e.getKey() instanceof GridServiceDeploymentKey))
continue;
@@ -1068,7 +1127,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity().
affinityReadyFuture(topVer).get();
- reassign(dep, topVer);
+ reassign(dep, topVer.topologyVersion());
}
catch (IgniteCheckedException ex) {
if (!(e instanceof ClusterTopologyCheckedException))
@@ -1085,7 +1144,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
if (!retries.isEmpty())
- onReassignmentFailed(topVer, retries);
+ onReassignmentFailed(topVer.topologyVersion(), retries);
}
// Clean up zombie assignments.
@@ -1265,4 +1324,46 @@ public class GridServiceProcessor extends GridProcessorAdapter {
*/
public abstract void run0();
}
+
+ /**
+ *
+ */
+ static class ServiceDeploymentPredicate implements IgniteBiPredicate<Object, Object> {
+ /** */
+ static final ServiceDeploymentPredicate INSTANCE = new ServiceDeploymentPredicate();
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Object key, Object val) {
+ return key instanceof GridServiceDeploymentKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ServiceDeploymentPredicate.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class ServiceAssignmentsPredicate implements IgniteBiPredicate<Object, Object> {
+ /** */
+ static final ServiceAssignmentsPredicate INSTANCE = new ServiceAssignmentsPredicate();
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Object key, Object val) {
+ return key instanceof GridServiceAssignmentsKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ServiceAssignmentsPredicate.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
new file mode 100644
index 0000000..a0fd9b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.timeout;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+
+/**
+ * Wrapper for {@link IgniteSpiTimeoutObject}.
+ */
+public class GridSpiTimeoutObject implements GridTimeoutObject {
+ /** */
+ @GridToStringInclude
+ private final IgniteSpiTimeoutObject obj;
+
+ /**
+ * @param obj SPI object.
+ */
+ public GridSpiTimeoutObject(IgniteSpiTimeoutObject obj) {
+ this.obj = obj;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ obj.onTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return obj.id();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return obj.endTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ assert false;
+
+ return super.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ assert false;
+
+ return super.equals(obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final String toString() {
+ return S.toString(GridSpiTimeoutObject.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 81ff72b..e4f370c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -21,11 +21,14 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.thread.*;
+import java.io.*;
import java.util.*;
/**
@@ -40,10 +43,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
new GridConcurrentSkipListSet<>(new Comparator<GridTimeoutObject>() {
/** {@inheritDoc} */
@Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) {
- long time1 = o1.endTime();
- long time2 = o2.endTime();
+ int res = Long.compare(o1.endTime(), o2.endTime());
- return time1 < time2 ? -1 : time1 > time2 ? 1 : o1.timeoutId().compareTo(o2.timeoutId());
+ if (res != 0)
+ return res;
+
+ return o1.timeoutId().compareTo(o2.timeoutId());
}
});
@@ -98,6 +103,26 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
}
/**
+ * Schedule the specified timer task for execution at the specified
+ * time with the specified period, in milliseconds.
+ *
+ * @param task Task to execute.
+ * @param delay Delay to first execution in milliseconds.
+ * @param period Period for execution in milliseconds or -1.
+ * @return Cancelable to cancel task.
+ */
+ public CancelableTask schedule(Runnable task, long delay, long period) {
+ assert delay >= 0 : delay;
+ assert period > 0 || period == -1 : period;
+
+ CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
+
+ addTimeoutObject(obj);
+
+ return obj;
+ }
+
+ /**
* @param timeoutObj Timeout object.
*/
public void removeTimeoutObject(GridTimeoutObject timeoutObj) {
@@ -173,4 +198,78 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
X.println(">>> Timeout processor memory stats [grid=" + ctx.gridName() + ']');
X.println(">>> timeoutObjsSize: " + timeoutObjs.size());
}
+
+ /**
+ *
+ */
+ public class CancelableTask implements GridTimeoutObject, Closeable {
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** */
+ private long endTime;
+
+ /** */
+ private final long period;
+
+ /** */
+ private volatile boolean cancel;
+
+ /** */
+ @GridToStringInclude
+ private final Runnable task;
+
+ /**
+ * @param task Task to execute.
+ * @param firstTime First time.
+ * @param period Period.
+ */
+ CancelableTask(Runnable task, long firstTime, long period) {
+ this.task = task;
+ endTime = firstTime;
+ this.period = period;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void onTimeout() {
+ if (cancel)
+ return;
+
+ try {
+ task.run();
+ }
+ finally {
+ if (!cancel && period > 0) {
+ endTime = U.currentTimeMillis() + period;
+
+ addTimeoutObject(this);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ cancel = true;
+
+ synchronized (this) {
+ // Just waiting for task execution end to make sure that task will not be executed anymore.
+ removeTimeoutObject(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CancelableTask.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java
index 1d1e022..f8ee265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java
@@ -36,6 +36,15 @@ public class IgniteTxRollbackCheckedException extends IgniteCheckedException {
}
/**
+ * Creates new exception with given nested exception.
+ *
+ * @param cause Nested exception.
+ */
+ public IgniteTxRollbackCheckedException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
* Creates new rollback exception with given error message and optional nested exception.
*
* @param msg Error message.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
index bff26ec..42fe089 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
@@ -128,25 +128,31 @@ public final class GridJavaProcess {
gjProc.log = log;
gjProc.procKilledC = procKilledC;
- String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
- String classpath = System.getProperty("java.class.path");
- String sfcp = System.getProperty("surefire.test.class.path");
-
- if (sfcp != null)
- classpath += System.getProperty("path.separator") + sfcp;
-
- if (cp != null)
- classpath += System.getProperty("path.separator") + cp;
-
List<String> procParams = params == null || params.isEmpty() ?
Collections.<String>emptyList() : Arrays.asList(params.split(" "));
List<String> procCommands = new ArrayList<>();
+ String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+
procCommands.add(javaBin);
procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs);
- procCommands.add("-cp");
- procCommands.add(classpath);
+
+ if (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath")) {
+ String classpath = System.getProperty("java.class.path");
+
+ String sfcp = System.getProperty("surefire.test.class.path");
+
+ if (sfcp != null)
+ classpath += System.getProperty("path.separator") + sfcp;
+
+ if (cp != null)
+ classpath += System.getProperty("path.separator") + cp;
+
+ procCommands.add("-cp");
+ procCommands.add(classpath);
+ }
+
procCommands.add(clsName);
procCommands.addAll(procParams);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index fb9ad29..f8caf22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -241,8 +241,8 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
lsnr.apply(this);
}
catch (IllegalStateException e) {
- U.warn(null, "Failed to notify listener (is grid stopped?) [fut=" + this +
- ", lsnr=" + lsnr + ", err=" + e.getMessage() + ']');
+ U.error(null, "Failed to notify listener (is grid stopped?) [fut=" + this +
+ ", lsnr=" + lsnr + ", err=" + e.getMessage() + ']', e);
}
catch (RuntimeException | Error e) {
U.error(null, "Failed to notify listener: " + lsnr, e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 31396fb..693a5a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -38,58 +38,58 @@ public interface GridCommunicationClient {
* @param handshakeC Handshake.
* @throws IgniteCheckedException If handshake failed.
*/
- void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException;
+ public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException;
/**
* @return {@code True} if client has been closed by this call,
* {@code false} if failed to close client (due to concurrent reservation or concurrent close).
*/
- boolean close();
+ public boolean close();
/**
* Forces client close.
*/
- void forceClose();
+ public void forceClose();
/**
* @return {@code True} if client is closed;
*/
- boolean closed();
+ public boolean closed();
/**
* @return {@code True} if client was reserved, {@code false} otherwise.
*/
- boolean reserve();
+ public boolean reserve();
/**
* Releases this client by decreasing reservations.
*/
- void release();
+ public void release();
/**
* @return {@code True} if client was reserved.
*/
- boolean reserved();
+ public boolean reserved();
/**
* Gets idle time of this client.
*
* @return Idle time of this client.
*/
- long getIdleTime();
+ public long getIdleTime();
/**
* @param data Data to send.
* @throws IgniteCheckedException If failed.
*/
- void sendMessage(ByteBuffer data) throws IgniteCheckedException;
+ public void sendMessage(ByteBuffer data) throws IgniteCheckedException;
/**
* @param data Data to send.
* @param len Length.
* @throws IgniteCheckedException If failed.
*/
- void sendMessage(byte[] data, int len) throws IgniteCheckedException;
+ public void sendMessage(byte[] data, int len) throws IgniteCheckedException;
/**
* @param nodeId Node ID (provided only if versions of local and remote nodes are different).
@@ -97,16 +97,10 @@ public interface GridCommunicationClient {
* @throws IgniteCheckedException If failed.
* @return {@code True} if should try to resend message.
*/
- boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
-
- /**
- * @param timeout Timeout.
- * @throws IOException If failed.
- */
- void flushIfNeeded(long timeout) throws IOException;
+ public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
/**
* @return {@code True} if send is asynchronous.
*/
- boolean async();
+ public boolean async();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
index 2b764ec..44ab4a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
@@ -85,7 +85,7 @@ public class GridNioDelimitedBuffer {
idx++;
}
else {
- pos = cnt - idx;
+ pos = cnt - (i - pos) - 1;
idx = 0;
}