You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/03/16 13:36:40 UTC

[01/13] ignite git commit: IGNITE-4641 - Refresh client attributes during reconnect

Repository: ignite
Updated Branches:
  refs/heads/master 94c1e7cb5 -> 84880a810


IGNITE-4641 - Refresh client attributes during reconnect


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8874f99f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8874f99f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8874f99f

Branch: refs/heads/master
Commit: 8874f99f44dc2edf08a525619edb49d5db70b938
Parents: 0a43665
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Feb 14 18:44:57 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Feb 14 18:44:57 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |   4 +
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   5 +
 .../org/apache/ignite/spi/IgniteSpiContext.java |   6 +
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   2 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   5 +
 .../tcp/internal/TcpDiscoveryNode.java          |   6 +-
 ...veryNodeAttributesUpdateOnReconnectTest.java | 110 ++++++++++++++++++
 .../tcp/TestReconnectPluginProvider.java        | 111 +++++++++++++++++++
 .../discovery/tcp/TestReconnectProcessor.java   |  93 ++++++++++++++++
 .../testframework/GridSpiTestContext.java       |   5 +
 .../IgniteSpiDiscoverySelfTestSuite.java        |   3 +
 .../org.apache.ignite.plugin.PluginProvider     |   1 +
 parent/pom.xml                                  |   1 +
 13 files changed, 349 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 584cc56..25cc715 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -569,6 +569,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
                     }
 
+                    @Override public Map<String, Object> nodeAttributes() {
+                        return ctx.nodeAttributes();
+                    }
+
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 219d07b..8879364 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -928,5 +928,10 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
             ((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
         }
+
+        /** {@inheritDoc} */
+        @Override public Map<String, Object> nodeAttributes() {
+            return Collections.emptyMap();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 5eb5227..96b3e61 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteException;
@@ -352,4 +353,9 @@ public interface IgniteSpiContext {
      * @param c Timeout object.
      */
     public void removeTimeoutObject(IgniteSpiTimeoutObject c);
+
+    /**
+     * @return Current node attributes.
+     */
+    public Map<String, Object> nodeAttributes();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 39c539c..932e7d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -628,7 +628,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     TcpDiscoveryNode node = locNode;
 
                     if (locNode.order() > 0)
-                        node = locNode.clientReconnectNode();
+                        node = locNode.clientReconnectNode(spi.spiCtx.nodeAttributes());
 
                     msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 45933e1..00ae97d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -409,6 +409,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** */
     private boolean clientReconnectDisabled;
 
+    /** */
+    protected IgniteSpiContext spiCtx;
+
     /** {@inheritDoc} */
     @Override public String getSpiState() {
         return impl.getSpiState();
@@ -1161,6 +1164,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
         super.onContextInitialized0(spiCtx);
 
+        this.spiCtx = spiCtx;
+
         ctxInitLatch.countDown();
 
         ipFinder.onSpiContextInitialized(spiCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 307aefe..d8b1fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -513,13 +514,14 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
     }
 
     /**
+     * @param nodeAttrs Current node attributes.
      * @return Copy of local node for client reconnect request.
      */
-    public TcpDiscoveryNode clientReconnectNode() {
+    public TcpDiscoveryNode clientReconnectNode(Map<String, Object> nodeAttrs) {
         TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver,
             null);
 
-        node.attrs = attrs;
+        node.attrs = Collections.unmodifiableMap(new HashMap<>(nodeAttrs));
         node.clientRouterNodeId = clientRouterNodeId;
 
         return node;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
new file mode 100644
index 0000000..56dc4ec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.reconnectClientNode;
+
+/**
+ * Checks whether on client reconnect node attributes from kernal context are sent.
+ */
+public class TcpDiscoveryNodeAttributesUpdateOnReconnectTest extends GridCommonAbstractTest {
+    /** */
+    private volatile String rejoinAttr;
+
+    /** */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.contains("client")) {
+            Map<String, String> attrs = new HashMap<>();
+
+            attrs.put("test", "1");
+
+            cfg.setUserAttributes(attrs);
+            cfg.setClientMode(true);
+        }
+
+        IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi spi = new IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi();
+
+        TcpDiscoveryIpFinder finder = ((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder();
+
+        spi.setIpFinder(finder);
+
+        cfg.setDiscoverySpi(spi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        TestReconnectPluginProvider.enabled = false;
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        TestReconnectPluginProvider.enabled = true;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnect() throws Exception {
+        Ignite srv = startGrid("server");
+
+        IgniteEvents evts = srv.events();
+
+        evts.enableLocal(EventType.EVTS_DISCOVERY_ALL);
+        evts.localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                ClusterNode node = ((DiscoveryEvent)evt).eventNode();
+
+                rejoinAttr = node.attribute("test");
+
+                return true;
+            }
+        }, EventType.EVT_NODE_JOINED);
+
+        Ignite client = startGrid("client");
+
+        reconnectClientNode(log, client, srv, null);
+
+        assertEquals("2", rejoinAttr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectPluginProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectPluginProvider.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectPluginProvider.java
new file mode 100644
index 0000000..692774c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectPluginProvider.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Creates TestReconnectProcessor.
+ */
+public class TestReconnectPluginProvider implements PluginProvider {
+    /** */
+    private GridKernalContext igniteCtx;
+
+    /** */
+    public static volatile boolean enabled;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "TestReconnectPlugin";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
+        igniteCtx = ((IgniteKernal)ctx.grid()).context();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext ctx) throws IgniteCheckedException {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() throws IgniteCheckedException {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object createComponent(PluginContext ctx, Class cls) {
+        if (enabled && GridSecurityProcessor.class.equals(cls))
+            return new TestReconnectProcessor(igniteCtx);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgnitePlugin plugin() {
+        return new IgnitePlugin() {};
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectProcessor.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectProcessor.java
new file mode 100644
index 0000000..f0ed35c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
+import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.plugin.security.AuthenticationContext;
+import org.apache.ignite.plugin.security.SecurityCredentials;
+import org.apache.ignite.plugin.security.SecurityException;
+import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.plugin.security.SecuritySubject;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Updates node attributes on disconnect.
+ */
+public class TestReconnectProcessor extends GridProcessorAdapter implements GridSecurityProcessor {
+    /**
+     * @param ctx Kernal context.
+     */
+    protected TestReconnectProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public SecurityContext authenticateNode(ClusterNode node,
+        SecurityCredentials cred) throws IgniteCheckedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isGlobalNodeAuthentication() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SecurityContext authenticate(AuthenticationContext ctx) throws IgniteCheckedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<SecuritySubject> authenticatedSubjects() throws IgniteCheckedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SecuritySubject authenticatedSubject(UUID subjId) throws IgniteCheckedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void authorize(String name, SecurityPermission perm,
+        @Nullable SecurityContext securityCtx) throws SecurityException {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionExpired(UUID subjId) {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean enabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        ctx.addNodeAttribute("test", "2");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index ac50ef9..1c8acbc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -601,6 +601,11 @@ public class GridSpiTestContext implements IgniteSpiContext {
             timeoutProcessor.removeTimeoutObject(new GridSpiTimeoutObject(obj));
     }
 
+    /** {@inheritDoc} */
+    @Override public Map<String, Object> nodeAttributes() {
+        return Collections.emptyMap();
+    }
+
     /**
      * @param cacheName Cache name.
      * @return Map representing cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 5f870a4..548e1a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMarshallerCheckSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnReconnectTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest;
@@ -84,6 +85,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(TcpDiscoveryRestartTest.class));
         suite.addTest(new TestSuite(TcpDiscoveryMultiThreadedTest.class));
 
+        suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class));
+
         // SSL.
         suite.addTest(new TestSuite(TcpDiscoverySslSelfTest.class));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
----------------------------------------------------------------------
diff --git a/modules/core/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
new file mode 100644
index 0000000..a7fdf43
--- /dev/null
+++ b/modules/core/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
@@ -0,0 +1 @@
+org.apache.ignite.spi.discovery.tcp.TestReconnectPluginProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 88532a7..8bf5dde 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -821,6 +821,7 @@
                                         <exclude>src/main/java/META-INF/services/javax.cache.spi.CachingProvider</exclude><!--cannot be changed-->
                                         <exclude>src/main/java/org/jetbrains/annotations/*.java</exclude><!--copyright-->
                                         <exclude>src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider</exclude><!--cannot be changed-->
+                                        <exclude>/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider</exclude>
                                         <exclude>dev-tools/IGNITE-*.patch</exclude>
                                         <exclude>dev-tools/.gradle/**/*</exclude>
                                         <exclude>dev-tools/gradle/wrapper/**/*</exclude>


[10/13] ignite git commit: IGNITE-4717 Fixed hangs in VisorCacheClearTask. (cherry picked from commit 76f3060)

Posted by nt...@apache.org.
IGNITE-4717 Fixed hangs in VisorCacheClearTask.
(cherry picked from commit 76f3060)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a30183ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a30183ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a30183ac

Branch: refs/heads/master
Commit: a30183ac821507fbdaa6f0cc2c6ef25ca2677867
Parents: 5736247
Author: Andrey Novikov <an...@gridgain.com>
Authored: Mon Feb 20 18:23:33 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Mar 2 10:47:15 2017 +0700

----------------------------------------------------------------------
 .../visor/cache/VisorCacheClearTask.java        | 88 +++++---------------
 .../visor/compute/VisorGatewayTask.java         | 30 ++++++-
 2 files changed, 49 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a30183ac/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index 1f1a6fb..0c8476f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.visor.cache;
 
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.internal.processors.task.GridInternal;
@@ -26,7 +25,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorOneNodeTask;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.resources.JobContextResource;
@@ -90,17 +88,11 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
         }
 
         /**
-         * @param subJob Sub job to execute asynchronously.
+         * @param fut Future for asynchronous cache operation.
          * @param idx Index.
          * @return {@code true} If subJob was not completed and this job should be suspended.
          */
-        private boolean callAsync(IgniteCallable<Integer> subJob, int idx) {
-            IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName)).withAsync();
-
-            compute.call(subJob);
-
-            IgniteFuture<Integer> fut = compute.future();
-
+        private boolean callAsync(IgniteFuture<Integer> fut, int idx) {
             futs[idx] = fut;
 
             if (fut.isDone())
@@ -119,16 +111,28 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
                 futs = new IgniteFuture[3];
 
             if (futs[0] == null || futs[1] == null || futs[2] == null) {
-                IgniteCache cache = ignite.cache(cacheName);
+                IgniteCache cache = ignite.cache(cacheName).withAsync();
+
+                if (futs[0] == null) {
+                    cache.size(CachePeekMode.PRIMARY);
+
+                    if (callAsync(cache.<Integer>future(), 0))
+                        return null;
+                }
 
-                if (futs[0] == null && callAsync(new VisorCacheSizeCallable(cache), 0))
-                    return null;
+                if (futs[1] == null) {
+                    cache.clear();
 
-                if (futs[1] == null && callAsync(new VisorCacheClearCallable(cache), 1))
-                    return null;
+                    if (callAsync(cache.<Integer>future(), 1))
+                        return null;
+                }
+                
+                if (futs[2] == null) {
+                    cache.size(CachePeekMode.PRIMARY);
 
-                if (futs[2] == null && callAsync(new VisorCacheSizeCallable(cache), 2))
-                    return null;
+                    if (callAsync(cache.<Integer>future(), 2))
+                        return null;
+                }
             }
 
             assert futs[0].isDone() && futs[1].isDone() && futs[2].isDone();
@@ -141,54 +145,4 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
             return S.toString(VisorCacheClearJob.class, this);
         }
     }
-
-    /**
-     * Callable to get cache size.
-     */
-    @GridInternal
-    private static class VisorCacheSizeCallable implements IgniteCallable<Integer> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final IgniteCache cache;
-
-        /**
-         * @param cache Cache to take size from.
-         */
-        private VisorCacheSizeCallable(IgniteCache cache) {
-            this.cache = cache;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer call() throws Exception {
-            return cache.size(CachePeekMode.PRIMARY);
-        }
-    }
-
-    /**
-     * Callable to clear cache.
-     */
-    @GridInternal
-    private static class VisorCacheClearCallable implements IgniteCallable<Integer> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final IgniteCache cache;
-
-        /**
-         * @param cache Cache to clear.
-         */
-        private VisorCacheClearCallable(IgniteCache cache) {
-            this.cache = cache;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer call() throws Exception {
-            cache.clear();
-
-            return 0;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a30183ac/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
index 2539a26..a64ec6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
@@ -29,21 +29,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.visor.VisorTaskArgument;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -101,9 +106,16 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
         @IgniteInstanceResource
         protected transient IgniteEx ignite;
 
+        /** Auto-inject job context. */
+        @JobContextResource
+        protected transient ComputeJobContext jobCtx;
+
         /** Arguments count. */
         private final int argsCnt;
 
+        /** Future for spawned task. */
+        private transient IgniteFuture fut;
+
         /**
          * Create job with specified argument.
          *
@@ -284,6 +296,9 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
         @Override public Object execute() throws IgniteException {
+            if (fut != null)
+                return fut.get();
+
             String nidsArg = argument(0);
             String taskName = argument(1);
 
@@ -355,8 +370,19 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
                 }
             }
 
-            return ignite.compute(ignite.cluster().forNodeIds(nids))
-                .execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+            IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids)).withAsync();
+            
+            comp.execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+
+            fut = comp.future();
+
+            fut.listen(new CI1<IgniteFuture<Object>>() {
+                @Override public void apply(IgniteFuture<Object> f) {
+                    jobCtx.callcc();
+                }
+            });
+
+            return jobCtx.holdcc();
         }
     }
 }


[12/13] ignite git commit: IGNITE-4717 VisorClearTask minor fix. (cherry picked from commit d4b87f4)

Posted by nt...@apache.org.
IGNITE-4717 VisorClearTask minor fix.
(cherry picked from commit d4b87f4)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bcb13982
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bcb13982
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bcb13982

Branch: refs/heads/master
Commit: bcb139822afa148a7ea3fbb3eecc274f308070f6
Parents: 7ad8e79
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Mar 10 15:51:38 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Mar 10 16:01:45 2017 +0700

----------------------------------------------------------------------
 .../visor/cache/VisorCacheClearTask.java        | 57 +++++++++++++++++++-
 1 file changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb13982/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index 0c8476f..ce74f17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorOneNodeTask;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.resources.JobContextResource;
@@ -145,4 +146,58 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
             return S.toString(VisorCacheClearJob.class, this);
         }
     }
-}
\ No newline at end of file
+
+    /**
+     * Callable to get cache size.
+     *
+     * @deprecated This class needed only for compatibility.
+     */
+    @GridInternal @Deprecated
+    private static class VisorCacheSizeCallable implements IgniteCallable<Integer> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteCache cache;
+
+        /**
+         * @param cache Cache to take size from.
+         */
+        private VisorCacheSizeCallable(IgniteCache cache) {
+            this.cache = cache;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            return cache.size(CachePeekMode.PRIMARY);
+        }
+    }
+
+    /**
+     * Callable to clear cache.
+     *
+     * @deprecated This class needed only for compatibility.
+     */
+    @GridInternal @Deprecated
+    private static class VisorCacheClearCallable implements IgniteCallable<Integer> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteCache cache;
+
+        /**
+         * @param cache Cache to clear.
+         */
+        private VisorCacheClearCallable(IgniteCache cache) {
+            this.cache = cache;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            cache.clear();
+
+            return 0;
+        }
+    }
+}


[02/13] ignite git commit: IGNITE-3429: Added BinaryResolver configuration samples for org.hibernate.cache.spi.CacheKey. This closes #1516.

Posted by nt...@apache.org.
IGNITE-3429: Added BinaryResolver configuration samples for  org.hibernate.cache.spi.CacheKey. This closes #1516.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05788b31
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05788b31
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05788b31

Branch: refs/heads/master
Commit: 05788b3188b30b5a3b39a75fe66301e03658408f
Parents: 8874f99
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Feb 17 12:14:53 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Fri Feb 17 12:14:53 2017 +0300

----------------------------------------------------------------------
 .../Hibernate5CacheKeyTypeConfiguration.java    | 52 ++++++++++++++++++++
 .../HibernateCacheKeyTypeConfiguration.java     | 51 +++++++++++++++++++
 2 files changed, 103 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/05788b31/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
new file mode 100644
index 0000000..886f69b
--- /dev/null
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.hibernate.config;
+
+import java.util.Objects;
+import org.apache.ignite.binary.BinaryAbstractIdentityResolver;
+import org.apache.ignite.binary.BinaryIdentityResolver;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+
+/**
+ * This configuration provides correct {@link BinaryIdentityResolver} implementation
+ * for Hibernate CacheKey class can be used as a key object.
+ *
+ * Note: for Hibernate version < 5.0 {@link HibernateCacheKeyTypeConfiguration} should be used.
+
+ */
+public class Hibernate5CacheKeyTypeConfiguration extends BinaryTypeConfiguration {
+
+    /** {@inheritDoc} */
+    public Hibernate5CacheKeyTypeConfiguration() {
+        super("org.hibernate.cache.internal.CacheKeyImplementation");
+
+        setIdentityResolver(new BinaryAbstractIdentityResolver() {
+            @Override protected int hashCode0(BinaryObject obj) {
+                return obj.field("id").hashCode();
+            }
+
+            @Override protected boolean equals0(BinaryObject o1, BinaryObject o2) {
+                Object obj0 = o1.field("id");
+                Object obj1 = o2.field("id");
+
+                return Objects.equals(obj0, obj1);
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/05788b31/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
new file mode 100644
index 0000000..c54292e
--- /dev/null
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.hibernate.config;
+
+import java.util.Objects;
+import org.apache.ignite.binary.BinaryAbstractIdentityResolver;
+import org.apache.ignite.binary.BinaryIdentityResolver;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+
+/**
+ * This configuration provides correct {@link BinaryIdentityResolver} implementation
+ * for Hibernate CacheKey class can be used as a key object.
+ *
+ * Note: for Hibernate version >= 5.0 {@link Hibernate5CacheKeyTypeConfiguration} should be used.
+ */
+public class HibernateCacheKeyTypeConfiguration extends BinaryTypeConfiguration {
+
+    /** {@inheritDoc} */
+    public HibernateCacheKeyTypeConfiguration() {
+        super("org.hibernate.cache.spi.CacheKey");
+
+        setIdentityResolver(new BinaryAbstractIdentityResolver() {
+            @Override protected int hashCode0(BinaryObject obj) {
+                return obj.field("key").hashCode();
+            }
+
+            @Override protected boolean equals0(BinaryObject o1, BinaryObject o2) {
+                Object obj0 = o1.field("key");
+                Object obj1 = o2.field("key");
+
+                return Objects.equals(obj0, obj1);
+            }
+        });
+    }
+}


[04/13] ignite git commit: Merge remote-tracking branch 'origin/ignite-1.7.8' into ignite-1.7.8

Posted by nt...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.7.8' into ignite-1.7.8


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/382fbc9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/382fbc9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/382fbc9d

Branch: refs/heads/master
Commit: 382fbc9d0b55f794eb4a9045fe72ca06b480062f
Parents: 1f881aa 05788b3
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Feb 17 12:35:18 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Feb 17 12:35:18 2017 +0300

----------------------------------------------------------------------
 .../Hibernate5CacheKeyTypeConfiguration.java    | 52 ++++++++++++++++++++
 .../HibernateCacheKeyTypeConfiguration.java     | 51 +++++++++++++++++++
 2 files changed, 103 insertions(+)
----------------------------------------------------------------------



[03/13] ignite git commit: IGNITE-4147 - Throw exception on connecting node to cluster with different SSL configuration

Posted by nt...@apache.org.
IGNITE-4147 - Throw exception on connecting node to cluster with different SSL configuration


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f881aa7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f881aa7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f881aa7

Branch: refs/heads/master
Commit: 1f881aa70a3894af01135f4cc5e341a8130462c2
Parents: 8874f99
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Feb 17 12:34:41 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Feb 17 12:34:41 2017 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  30 ++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  19 ++
 .../TcpDiscoverySslSecuredUnsecuredTest.java    | 185 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   2 +
 4 files changed, 235 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1f881aa7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 932e7d1..95e2cda 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp;
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.StreamCorruptedException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
@@ -44,6 +45,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ssl.SSLException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
@@ -587,6 +589,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         int connectAttempts = 1;
 
+        int sslConnectAttempts = 3;
+
         UUID locNodeId = getLocalNodeId();
 
         IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -662,6 +666,22 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (X.hasCause(e, SSLException.class)) {
+                    if (--sslConnectAttempts == 0)
+                        throw new IgniteSpiException("Unable to establish secure connection. " +
+                            "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+
+                    continue;
+                }
+
+                if (X.hasCause(e, StreamCorruptedException.class)) {
+                    if (--sslConnectAttempts == 0)
+                        throw new IgniteSpiException("Unable to establish plain connection. " +
+                            "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+
+                    continue;
+                }
+
                 if (timeoutHelper.checkFailureTimeoutReached(e))
                     break;
 
@@ -1593,7 +1613,15 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             joinCnt++;
 
-            T2<SocketStream, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
+            T2<SocketStream, Boolean> joinRes;
+            try {
+                joinRes = joinTopology(false, spi.joinTimeout);
+            }
+            catch (IgniteSpiException e) {
+                joinError(e);
+
+                return;
+            }
 
             if (joinRes == null) {
                 if (join)

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f881aa7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d462ac2..4600be0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.ObjectStreamException;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.io.StreamCorruptedException;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -1109,6 +1110,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         int connectAttempts = 1;
 
+        int sslConnectAttempts = 3;
+
         boolean joinReqSent;
 
         UUID locNodeId = getLocalNodeId();
@@ -1220,6 +1223,22 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 errs.add(e);
 
+                if (X.hasCause(e, SSLException.class)) {
+                    if (--sslConnectAttempts == 0)
+                        throw new IgniteException("Unable to establish secure connection. " +
+                            "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+
+                    continue;
+                }
+
+                if (X.hasCause(e, StreamCorruptedException.class)) {
+                    if (--sslConnectAttempts == 0)
+                        throw new IgniteException("Unable to establish plain connection. " +
+                            "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+
+                    continue;
+                }
+
                 if (timeoutHelper.checkFailureTimeoutReached(e))
                     break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f881aa7/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
new file mode 100644
index 0000000..ca34f77
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StreamCorruptedException;
+import java.net.Socket;
+import java.util.concurrent.Callable;
+import javax.net.ssl.SSLException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Tests cases when node connects to cluster with different SSL configuration.
+ * Exception with meaningful message should be thrown.
+ */
+public class TcpDiscoverySslSecuredUnsecuredTest extends GridCommonAbstractTest {
+    /** */
+    private volatile TcpDiscoverySpi spi;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(gridName.contains("client"));
+
+        if (gridName.contains("ssl"))
+            cfg.setSslContextFactory(GridTestUtils.sslFactory());
+
+        if (spi != null) {
+            final TcpDiscoveryIpFinder finder = ((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder();
+
+            spi.setIpFinder(finder);
+
+            cfg.setDiscoverySpi(spi);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSecuredUnsecuredServerConnection() throws Exception {
+        checkConnection("plain-server", "ssl-server");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUnsecuredSecuredServerConnection() throws Exception {
+        checkConnection("ssl-server", "plain-server");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSecuredClientUnsecuredServerConnection() throws Exception {
+        checkConnection("plain-server", "ssl-client");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUnsecuredClientSecuredServerConnection() throws Exception {
+        checkConnection("ssl-server", "plain-client");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPlainServerNodesRestart() throws Exception {
+        checkNodesRestart("plain-server-1", "plain-server-2");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSslServerNodesRestart() throws Exception {
+        checkNodesRestart("ssl-server-1", "ssl-server-2");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPlainClientNodesRestart() throws Exception {
+        checkNodesRestart("plain-server", "plain-client");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSslClientNodesRestart() throws Exception {
+        checkNodesRestart("ssl-server", "ssl-client");
+    }
+
+    /**
+     * @param name1 First grid name.
+     * @param name2 Second grid name.
+     * @throws Exception If failed.
+     */
+    private void checkNodesRestart(String name1, String name2) throws Exception {
+        startGrid(name1);
+
+        spi = new FailDiscoverySpi(!name1.contains("ssl"));
+
+        startGrid(name2);
+    }
+
+    /**
+     * @param name1 First grid name.
+     * @param name2 Second grid name.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    private void checkConnection(final String name1, final String name2) throws Exception {
+        startGrid(name1);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                startGrid(name2);
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+    }
+
+    /**
+     *
+     */
+    private class FailDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private int cnt = 1;
+
+        /** */
+        private final boolean plain;
+
+        /**
+         * @param plain Plain conection flag.
+         */
+        private FailDiscoverySpi(final boolean plain) {
+            this.plain = plain;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected <T> T readMessage(final Socket sock, @Nullable final InputStream in,
+            final long timeout) throws IOException, IgniteCheckedException {
+            if (cnt-- > 0) {
+                if (plain)
+                    throw new StreamCorruptedException("Test exception");
+                else
+                    throw new SSLException("Test SSL exception");
+            }
+
+            return super.readMessage(sock, in, timeout);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1f881aa7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 548e1a5..e6b39f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiConfigSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiFailureTimeoutSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiStartStopSelfTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSecuredUnsecuredTest;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSelfTest;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinderSelfTest;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest;
@@ -89,6 +90,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
 
         // SSL.
         suite.addTest(new TestSuite(TcpDiscoverySslSelfTest.class));
+        suite.addTest(new TestSuite(TcpDiscoverySslSecuredUnsecuredTest.class));
 
         return suite;
     }


[06/13] ignite git commit: IGNITE-4624: Scan query optimization. This closes #1509.

Posted by nt...@apache.org.
IGNITE-4624: Scan query optimization. This closes #1509.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f57760d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f57760d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f57760d

Branch: refs/heads/master
Commit: 2f57760dbb4fba948cd035498d2c7f71869c0665
Parents: 11bbec4
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Feb 17 16:15:31 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Fri Feb 17 18:47:21 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtCacheAdapter.java    | 19 +++-
 .../cache/query/GridCacheQueryManager.java      | 97 ++++++++++----------
 2 files changed, 64 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2f57760d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index dcd379a..be7fa55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1247,14 +1247,27 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         final boolean backup,
         final boolean keepBinary,
         final AffinityTopologyVersion topVer) {
+
+        return iterator(localEntriesIteratorEx(primary, backup, topVer), !keepBinary);
+    }
+
+    /**
+     * @param primary If {@code true} includes primary entries.
+     * @param backup If {@code true} includes backup entries.
+     * @param topVer Specified affinity topology version.
+     * @return Local entries iterator.
+     */
+    public Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final boolean primary,
+        final boolean backup,
+        final AffinityTopologyVersion topVer) {
         assert primary || backup;
 
         if (primary && backup)
-            return iterator(entries().iterator(), !keepBinary);
+            return entries().iterator();
         else {
             final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator();
 
-            Iterator<GridCacheMapEntry> it = new Iterator<GridCacheMapEntry>() {
+            return new Iterator<GridCacheMapEntry>() {
                 private GridCacheMapEntry next;
 
                 private Iterator<GridCacheMapEntry> curIt;
@@ -1311,8 +1324,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                     while (partIt.hasNext());
                 }
             };
-
-            return iterator(it, !keepBinary);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f57760d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index d64dff4..14b1106 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1033,23 +1033,25 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @throws GridDhtUnreservedPartitionException If failed to reserve partition.
      */
     private GridIterator<IgniteBiTuple<K, V>> onheapIterator(
-        GridCacheQueryAdapter<?> qry,
+        final GridCacheQueryAdapter<?> qry,
         AffinityTopologyVersion topVer,
         final IgniteBiPredicate<K, V> keyValFilter,
-        boolean backups,
+        final boolean backups,
         final ExpiryPolicy plc,
         final boolean locNode) throws GridDhtUnreservedPartitionException {
-        Iterator<K> keyIter;
+        Iterator<? extends GridCacheEntryEx> entryIter;
 
         GridDhtLocalPartition locPart = null;
 
         Integer part = qry.partition();
 
-        if (part == null || cctx.isLocal()) {
-            // Performance optimization.
-            if (locNode && plc == null && !cctx.isLocal()) {
-                GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht();
+        if (cctx.isLocal())
+            entryIter = cctx.local().allEntries().iterator();
+        else if (part == null) {
+            GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht();
 
+            // Performance optimization.
+            if (locNode && plc == null) {
                 final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true,
                     backups, cache.context().keepBinary(), topVer);
 
@@ -1099,12 +1101,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 };
             }
 
-            IgniteInternalCache<K, V> keepBinaryCache = cctx.cache().keepBinary();
-
-            keyIter = backups ? keepBinaryCache.keySetx().iterator() : keepBinaryCache.primaryKeySet().iterator();
+            entryIter = cache.localEntriesIteratorEx(true, backups, topVer);
         }
         else if (part < 0 || part >= cctx.affinity().partitions())
-            keyIter = new GridEmptyIterator<>();
+            return new GridEmptyIterator<>();
         else {
             final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
 
@@ -1115,28 +1115,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
                     "Partition can not be reserved.");
 
-            final GridDhtLocalPartition locPart0 = locPart;
-
-            keyIter = new Iterator<K>() {
-                private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator();
-
-                @Override public boolean hasNext() {
-                    return iter0.hasNext();
-                }
-
-                @Override public K next() {
-                    return (K)iter0.next();
-                }
-
-                @Override public void remove() {
-                    iter0.remove();
-                }
-            };
+            entryIter = locPart.allEntries().iterator();
         }
 
         final GridDhtLocalPartition locPart0 = locPart;
 
-        return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) {
+        return new PeekValueExpiryAwareIterator(entryIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) {
             @Override protected void onClose() {
                 super.onClose();
 
@@ -1263,18 +1247,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         ExpiryPolicy expPlc,
         final boolean keepBinary,
         boolean locNode) {
-        Iterator<K> keyIter = new Iterator<K>() {
+        Iterator<? extends GridCacheEntryEx> keyIter = new Iterator<GridCacheEntryEx>() {
             /** {@inheritDoc} */
             @Override public boolean hasNext() {
                 return it.hasNext();
             }
 
             /** {@inheritDoc} */
-            @Override public K next() {
+            @Override public GridCacheEntryEx next() {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(it.next().getKey());
 
-                    return (K)cctx.unwrapBinaryIfNeeded(key, keepBinary);
+                    final GridCacheEntryEx entryEx = cctx.cache().entryEx(key);
+
+                    return entryEx;
                 }
                 catch (IgniteCheckedException e) {
                     throw new IgniteException(e);
@@ -2189,8 +2175,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     * Gets cache queries detailed metrics.
-     * Detail metrics could be enabled by setting non-zero value via {@link CacheConfiguration#setQueryDetailMetricsSize(int)}
+     * Gets cache queries detailed metrics. Detail metrics could be enabled by setting non-zero value via {@link
+     * CacheConfiguration#setQueryDetailMetricsSize(int)}
      *
      * @return Cache queries metrics aggregated by query type and query text.
      */
@@ -3091,8 +3077,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
         /** Absolute position of each recipient. */
         private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
+
         /** */
         private CircularQueue<R> queue;
+
         /** */
         private int pruned;
 
@@ -3529,10 +3517,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         private IgniteCacheExpiryPolicy expiryPlc;
 
         /** */
-        private Iterator<K> keyIt;
+        private Iterator<? extends GridCacheEntryEx> entryIt;
 
         /**
-         * @param keyIt Key iterator.
+         * @param entryIter Key iterator.
          * @param plc Expiry policy.
          * @param topVer Topology version.
          * @param keyValFilter Key-value filter.
@@ -3540,8 +3528,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
          * @param locNode Local node.
          * @param heapOnly Heap only.
          */
-        private PeekValueExpiryAwareIterator(
-            Iterator<K> keyIt,
+        PeekValueExpiryAwareIterator(
+            Iterator<? extends GridCacheEntryEx> entryIter,
             ExpiryPolicy plc,
             AffinityTopologyVersion topVer,
             IgniteBiPredicate<K, V> keyValFilter,
@@ -3549,7 +3537,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             boolean locNode,
             boolean heapOnly
         ) {
-            this.keyIt = keyIt;
+            this.entryIt = entryIter;
             this.plc = plc;
             this.topVer = topVer;
             this.keyValFilter = keyValFilter;
@@ -3593,15 +3581,27 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         private void advance() {
             IgniteBiTuple<K, V> next0 = null;
 
-            while (keyIt.hasNext()) {
+            while (entryIt.hasNext()) {
                 next0 = null;
 
-                K key = keyIt.next();
+                GridCacheEntryEx entry = entryIt.next();
+
+                if (entry.deleted())
+                    continue;
 
+                KeyCacheObject key = entry.key();
                 CacheObject val;
 
                 try {
-                    val = value(key);
+                    if (heapOnly)
+                        val = entry.peek(true, false, false, expiryPlc);
+                    else
+                        val = value(entry, entry.key());
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    assert heapOnly;
+
+                    continue;
                 }
                 catch (IgniteCheckedException e) {
                     if (log.isDebugEnabled())
@@ -3664,23 +3664,24 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         }
 
         /**
+         * @param entry Entry.
          * @param key Key.
          * @return Value.
          * @throws IgniteCheckedException If failed to peek value.
          */
-        private CacheObject value(K key) throws IgniteCheckedException {
+        private CacheObject value(GridCacheEntryEx entry, KeyCacheObject key) throws IgniteCheckedException {
             while (true) {
                 try {
-                    GridCacheEntryEx entry = heapOnly ? cache.peekEx(key) : cache.entryEx(key);
+                    if (entry == null)
+                        entry = cache.entryEx(key);
 
-                    if (expiryPlc != null && !heapOnly)
+                    if (expiryPlc != null)
                         entry.unswap();
 
-                    return entry != null ? entry.peek(true, !heapOnly, !heapOnly, topVer, expiryPlc) : null;
+                    return entry.peek(true, true, true, topVer, expiryPlc);
                 }
                 catch (GridCacheEntryRemovedException ignore) {
-                    if (heapOnly)
-                        return null;
+                    entry = null;
                 }
             }
         }


[07/13] ignite git commit: IGNITE-3429 - Rollback due to broken compilation

Posted by nt...@apache.org.
IGNITE-3429 - Rollback due to broken compilation


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c0e2df26
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c0e2df26
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c0e2df26

Branch: refs/heads/master
Commit: c0e2df26f056cd11690d821146f05e3fd938906e
Parents: 2f57760
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Feb 20 11:17:35 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Feb 20 11:17:35 2017 +0300

----------------------------------------------------------------------
 .../Hibernate5CacheKeyTypeConfiguration.java    | 52 --------------------
 .../HibernateCacheKeyTypeConfiguration.java     | 51 -------------------
 2 files changed, 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c0e2df26/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
deleted file mode 100644
index 886f69b..0000000
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.hibernate.config;
-
-import java.util.Objects;
-import org.apache.ignite.binary.BinaryAbstractIdentityResolver;
-import org.apache.ignite.binary.BinaryIdentityResolver;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.binary.BinaryTypeConfiguration;
-
-/**
- * This configuration provides correct {@link BinaryIdentityResolver} implementation
- * for Hibernate CacheKey class can be used as a key object.
- *
- * Note: for Hibernate version < 5.0 {@link HibernateCacheKeyTypeConfiguration} should be used.
-
- */
-public class Hibernate5CacheKeyTypeConfiguration extends BinaryTypeConfiguration {
-
-    /** {@inheritDoc} */
-    public Hibernate5CacheKeyTypeConfiguration() {
-        super("org.hibernate.cache.internal.CacheKeyImplementation");
-
-        setIdentityResolver(new BinaryAbstractIdentityResolver() {
-            @Override protected int hashCode0(BinaryObject obj) {
-                return obj.field("id").hashCode();
-            }
-
-            @Override protected boolean equals0(BinaryObject o1, BinaryObject o2) {
-                Object obj0 = o1.field("id");
-                Object obj1 = o2.field("id");
-
-                return Objects.equals(obj0, obj1);
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c0e2df26/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
deleted file mode 100644
index c54292e..0000000
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.hibernate.config;
-
-import java.util.Objects;
-import org.apache.ignite.binary.BinaryAbstractIdentityResolver;
-import org.apache.ignite.binary.BinaryIdentityResolver;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.binary.BinaryTypeConfiguration;
-
-/**
- * This configuration provides correct {@link BinaryIdentityResolver} implementation
- * for Hibernate CacheKey class can be used as a key object.
- *
- * Note: for Hibernate version >= 5.0 {@link Hibernate5CacheKeyTypeConfiguration} should be used.
- */
-public class HibernateCacheKeyTypeConfiguration extends BinaryTypeConfiguration {
-
-    /** {@inheritDoc} */
-    public HibernateCacheKeyTypeConfiguration() {
-        super("org.hibernate.cache.spi.CacheKey");
-
-        setIdentityResolver(new BinaryAbstractIdentityResolver() {
-            @Override protected int hashCode0(BinaryObject obj) {
-                return obj.field("key").hashCode();
-            }
-
-            @Override protected boolean equals0(BinaryObject o1, BinaryObject o2) {
-                Object obj0 = o1.field("key");
-                Object obj1 = o2.field("key");
-
-                return Objects.equals(obj0, obj1);
-            }
-        });
-    }
-}


[13/13] ignite git commit: Merge ignite-1.7.9 into master

Posted by nt...@apache.org.
Merge ignite-1.7.9 into master


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84880a81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84880a81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84880a81

Branch: refs/heads/master
Commit: 84880a8108a586604ab00d7bf48ba6c9f8f658ee
Parents: 94c1e7c bcb1398
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Mar 16 16:34:28 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Mar 16 16:34:36 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/GridManagerAdapter.java   |   4 +
 .../cache/DynamicCacheChangeBatch.java          |  14 ++
 .../service/GridServiceProcessor.java           |  49 ++---
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   5 +
 .../org/apache/ignite/spi/IgniteSpiContext.java |   6 +
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  32 +++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  19 ++
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   5 +
 .../tcp/internal/TcpDiscoveryNode.java          |   6 +-
 .../GridServiceContinuousQueryRedeploy.java     | 167 +++++++++++++++++
 ...veryNodeAttributesUpdateOnReconnectTest.java | 110 +++++++++++
 .../TcpDiscoverySslSecuredUnsecuredTest.java    | 185 +++++++++++++++++++
 .../tcp/TestReconnectPluginProvider.java        | 111 +++++++++++
 .../discovery/tcp/TestReconnectProcessor.java   |  93 ++++++++++
 .../testframework/GridSpiTestContext.java       |   5 +
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 .../IgniteSpiDiscoverySelfTestSuite.java        |   5 +
 .../org.apache.ignite.plugin.PluginProvider     |   1 +
 .../processors/query/h2/IgniteH2Indexing.java   |   1 +
 parent/pom.xml                                  |   1 +
 20 files changed, 793 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 99146aa,4eeafed..74cca8e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -1586,16 -1576,19 +1576,22 @@@ public class GridServiceProcessor exten
                          if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
                              return;
                      }
+                     else if (msg instanceof DynamicCacheChangeBatch) {
+                         if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
+                             return;
+                     }
+                     else
+                         return;
                  }
                  else
 -                    topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
 +                    topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
  
 -                depExe.submit(new BusyRunnable() {
 +                depExe.execute(new BusyRunnable() {
                      @Override public void run0() {
 -                        ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
 +                        // In case the cache instance isn't tracked by DiscoveryManager anymore.
 +                        discoCache.updateAlives(ctx.discovery());
 +
 +                        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
  
                          if (oldest != null && oldest.isLocal()) {
                              final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 41035ec,5977702..df27868
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@@ -141,7 -144,7 +142,8 @@@ public class IgniteKernalSelfTestSuite 
          suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class);
          suite.addTestSuite(IgniteServiceReassignmentTest.class);
          suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class);
 +        suite.addTestSuite(IgniteServiceDynamicCachesSelfTest.class);
+         suite.addTestSuite(GridServiceContinuousQueryRedeploy.class);
  
          suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
          suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 46fbb9e,62b47b8..7b188bf
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -83,9 -80,7 +83,10 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
  import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
  import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 +import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
  import org.apache.ignite.internal.processors.query.GridQueryCancel;
  import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
  import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;

http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/parent/pom.xml
----------------------------------------------------------------------


[09/13] ignite git commit: Implemented support for enforce join order flag. (cherry picked from commit a7f77d4)

Posted by nt...@apache.org.
Implemented support for enforce join order flag.
(cherry picked from commit a7f77d4)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57362479
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57362479
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57362479

Branch: refs/heads/master
Commit: 573624796b171b2420b87657598198f40a91f6bb
Parents: 9fcb3e7
Author: Alexey Kuznetsov <ak...@gridgain.com>
Authored: Wed Mar 1 22:09:40 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Mar 2 10:07:25 2017 +0700

----------------------------------------------------------------------
 .../internal/visor/query/VisorQueryArgV3.java   | 51 ++++++++++++++++++++
 .../internal/visor/query/VisorQueryJob.java     |  6 +--
 .../resources/META-INF/classnames.properties    |  1 +
 3 files changed, 55 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/57362479/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
new file mode 100644
index 0000000..f32c00a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+/**
+ * Arguments for {@link VisorQueryTask}.
+ */
+public class VisorQueryArgV3 extends VisorQueryArgV2 {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /**
+     * @param cacheName Cache name for query.
+     * @param qryTxt Query text.
+     * @param distributedJoins If {@code true} then distributed joins enabled.
+     * @param enforceJoinOrder If {@code true} then enforce join order.
+     * @param loc Flag whether to execute query locally.
+     * @param pageSize Result batch size.
+     */
+    public VisorQueryArgV3(String cacheName, String qryTxt,
+        boolean distributedJoins, boolean enforceJoinOrder, boolean loc, int pageSize) {
+        super(cacheName, qryTxt, distributedJoins, loc, pageSize);
+
+        this.enforceJoinOrder = enforceJoinOrder;
+    }
+
+    /**
+     * @return Enforce join order flag.
+     */
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/57362479/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index c66b2dd..1ac90ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -131,9 +131,8 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
                 if (scanWithFilter) {
                     boolean caseSensitive = qryTxt.startsWith(SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE);
 
-                    String ptrn = caseSensitive
-                        ? qryTxt.substring(SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length())
-                        : qryTxt.substring(SCAN_CACHE_WITH_FILTER.length());
+                    String ptrn = qryTxt.substring(
+                        caseSensitive ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length() : SCAN_CACHE_WITH_FILTER.length());
 
                     filter = new VisorQueryScanSubstringFilter(caseSensitive, ptrn);
                 }
@@ -162,6 +161,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
                 qry.setPageSize(arg.pageSize());
                 qry.setLocal(arg.local());
                 qry.setDistributedJoins(arg instanceof VisorQueryArgV2 && ((VisorQueryArgV2)arg).distributedJoins());
+                qry.setEnforceJoinOrder(arg instanceof VisorQueryArgV3 && ((VisorQueryArgV3)arg).enforceJoinOrder());
 
                 long start = U.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/57362479/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 8a6dc66..db486a5 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1792,6 +1792,7 @@ org.apache.ignite.internal.visor.node.VisorSpisConfiguration
 org.apache.ignite.internal.visor.node.VisorTransactionConfiguration
 org.apache.ignite.internal.visor.query.VisorQueryArg
 org.apache.ignite.internal.visor.query.VisorQueryArgV2
+org.apache.ignite.internal.visor.query.VisorQueryArgV3
 org.apache.ignite.internal.visor.query.VisorQueryCleanupTask
 org.apache.ignite.internal.visor.query.VisorQueryCleanupTask$VisorQueryCleanupJob
 org.apache.ignite.internal.visor.query.VisorQueryField


[11/13] ignite git commit: ignite-4577 Add non-reachable addresses at the end of addresses list

Posted by nt...@apache.org.
ignite-4577 Add non-reachable addresses at the end of addresses list


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ad8e79f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ad8e79f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ad8e79f

Branch: refs/heads/master
Commit: 7ad8e79fa1077291c50f2f535ecccde6baee0321
Parents: a30183a
Author: Evgenii Zhuravlev <ez...@gridgain.com>
Authored: Tue Mar 7 14:32:28 2017 +0300
Committer: Evgenii Zhuravlev <ez...@gridgain.com>
Committed: Tue Mar 7 14:41:38 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       | 14 ++++++-----
 .../communication/tcp/TcpCommunicationSpi.java  | 25 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7ad8e79f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3fa3f7b..ba118cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1810,15 +1810,16 @@ public abstract class IgniteUtils {
 
     /**
      * @param addrs Addresses.
+     * @return List of reachable addresses.
      */
-    public static List<InetAddress> filterReachable(List<InetAddress> addrs) {
+    public static List<InetAddress> filterReachable(Collection<InetAddress> addrs) {
         final int reachTimeout = 2000;
 
         if (addrs.isEmpty())
             return Collections.emptyList();
 
         if (addrs.size() == 1) {
-            InetAddress addr = addrs.get(0);
+            InetAddress addr = F.first(addrs);
 
             if (reachable(addr, reachTimeout))
                 return Collections.singletonList(addr);
@@ -1834,8 +1835,7 @@ public abstract class IgniteUtils {
 
         for (final InetAddress addr : addrs) {
             futs.add(executor.submit(new Runnable() {
-                @Override
-                public void run() {
+                @Override public void run() {
                     if (reachable(addr, reachTimeout)) {
                         synchronized (res) {
                             res.add(addr);
@@ -1848,11 +1848,13 @@ public abstract class IgniteUtils {
         for (Future<?> fut : futs) {
             try {
                 fut.get();
-            } catch (InterruptedException e) {
+            }
+            catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
 
                 throw new IgniteException("Thread has been interrupted.", e);
-            } catch (ExecutionException e) {
+            }
+            catch (ExecutionException e) {
                 throw new IgniteException(e);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ad8e79f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 94b7efe..81454f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2334,6 +2334,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (isExtAddrsExist)
             addrs.addAll(extAddrs);
 
+        Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
+
+        for (InetSocketAddress addr : addrs)
+            allInetAddrs.add(addr.getAddress());
+
+        List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
+
+        if (reachableInetAddrs.size() < allInetAddrs.size()) {
+            LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
+
+            for (InetSocketAddress addr : addrs) {
+                if (reachableInetAddrs.contains(addr.getAddress()))
+                    addrs0.add(addr);
+            }
+            for (InetSocketAddress addr : addrs) {
+                if (!reachableInetAddrs.contains(addr.getAddress()))
+                    addrs0.add(addr);
+            }
+
+            addrs = addrs0;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
+
         boolean conn = false;
         GridCommunicationClient client = null;
         IgniteCheckedException errs = null;


[05/13] ignite git commit: IGNITE-4436 API for collecting list of running queries and cancel them. (cherry picked from commit 49237343d53ee33d44e5599cd7fe7da868ee30a1)

Posted by nt...@apache.org.
IGNITE-4436 API for collecting list of running queries and cancel them.
(cherry picked from commit 49237343d53ee33d44e5599cd7fe7da868ee30a1)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11bbec48
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11bbec48
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11bbec48

Branch: refs/heads/master
Commit: 11bbec487dc174fac1acd6b50c940840305bc75a
Parents: 382fbc9
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Feb 17 17:57:50 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Feb 17 17:57:50 2017 +0700

----------------------------------------------------------------------
 .../cache/query/GridCacheTwoStepQuery.java      |  18 +-
 .../processors/query/GridQueryIndexing.java     |  17 +-
 .../processors/query/GridQueryProcessor.java    |  26 ++-
 .../processors/query/GridRunningQueryInfo.java  | 132 ++++++++++++
 .../internal/visor/VisorMultiNodeTask.java      |   2 +-
 .../visor/query/VisorCancelQueriesTask.java     |  72 +++++++
 .../query/VisorCollectRunningQueriesTask.java   |  96 +++++++++
 .../internal/visor/query/VisorRunningQuery.java | 132 ++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   |  83 +++++++-
 .../query/h2/sql/GridSqlQuerySplitter.java      |   4 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  60 +++++-
 .../cache/CacheSqlQueryValueCopySelfTest.java   | 208 +++++++++++++++++--
 .../cache/GridCacheCrossCacheQuerySelfTest.java |   2 +-
 .../h2/GridIndexingSpiAbstractSelfTest.java     |   7 +
 14 files changed, 821 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 8dcba2f..f53936f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -46,6 +46,9 @@ public class GridCacheTwoStepQuery {
     private boolean explain;
 
     /** */
+    private String originalSql;
+
+    /** */
     private Collection<String> spaces;
 
     /** */
@@ -67,10 +70,12 @@ public class GridCacheTwoStepQuery {
     private List<Integer> extraCaches;
 
     /**
+     * @param originalSql Original query SQL.
      * @param schemas Schema names in query.
      * @param tbls Tables in query.
      */
-    public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) {
+    public GridCacheTwoStepQuery(String originalSql, Set<String> schemas, Set<String> tbls) {
+        this.originalSql = originalSql;
         this.schemas = schemas;
         this.tbls = tbls;
     }
@@ -196,6 +201,13 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return Original query SQL.
+     */
+    public String originalSql() {
+        return originalSql;
+    }
+
+    /**
      * @return Spaces.
      */
     public Collection<String> spaces() {
@@ -223,7 +235,7 @@ public class GridCacheTwoStepQuery {
     public GridCacheTwoStepQuery copy(Object[] args) {
         assert !explain;
 
-        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls);
+        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);
 
         cp.caches = caches;
         cp.extraCaches = extraCaches;
@@ -250,4 +262,4 @@ public class GridCacheTwoStepQuery {
     @Override public String toString() {
         return S.toString(GridCacheTwoStepQuery.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 539ebc0..1cebbb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -224,7 +224,22 @@ public interface GridQueryIndexing {
     public void onDisconnected(IgniteFuture<?> reconnectFut);
 
     /**
+     * Collect queries that already running more than specified duration.
+     *
+     * @param duration Duration to check.
+     * @return Collection of long running queries.
+     */
+    public Collection<GridRunningQueryInfo> runningQueries(long duration);
+
+    /**
+     * Cancel specified queries.
+     *
+     * @param queries Queries ID's to cancel.
+     */
+    public void cancelQueries(Collection<Long> queries);
+
+    /**
      * Cancels all executing queries.
      */
     public void cancelAllQueries();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index c2e5717..0a0d166 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -44,7 +44,6 @@ import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.binary.BinaryField;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryType;
@@ -118,7 +117,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     private static final int QRY_DETAIL_METRICS_EVICTION_FREQ = 3_000;
 
     /** */
-    private static Set<Class<?>> SQL_TYPES = new HashSet<>(F.<Class<?>>asList(
+    private static final Set<Class<?>> SQL_TYPES = new HashSet<>(F.<Class<?>>asList(
         Integer.class,
         Boolean.class,
         Byte.class,
@@ -913,6 +912,29 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Collect queries that already running more than specified duration.
+     *
+     * @param duration Duration to check.
+     * @return Collection of long running queries.
+     */
+    public Collection<GridRunningQueryInfo> runningQueries(long duration) {
+        if (moduleEnabled())
+            return idx.runningQueries(duration);
+
+        return Collections.emptyList();
+    }
+
+    /**
+     * Cancel specified queries.
+     *
+     * @param queries Queries ID's to cancel.
+     */
+    public void cancelQueries(Collection<Long> queries) {
+        if (moduleEnabled())
+            idx.cancelQueries(queries);
+    }
+
+    /**
      * @param sqlQry Sql query.
      * @param params Params.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
new file mode 100644
index 0000000..d77c8c0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+
+/**
+ * Query descriptor.
+ */
+public class GridRunningQueryInfo {
+    /** */
+    private final long id;
+
+    /** */
+    private final String qry;
+
+    /** Query type. */
+    private final GridCacheQueryType qryType;
+
+    /** */
+    private final String cache;
+
+    /** */
+    private final long startTime;
+
+    /** */
+    private final GridQueryCancel cancel;
+
+    /** */
+    private final boolean loc;
+
+    /**
+     * @param id Query ID.
+     * @param qry Query text.
+     * @param qryType Query type.
+     * @param cache Cache where query was executed.
+     * @param startTime Query start time.
+     * @param cancel Query cancel.
+     * @param loc Local query flag.
+     */
+    public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String cache, long startTime,
+        GridQueryCancel cancel, boolean loc) {
+        this.id = id;
+        this.qry = qry;
+        this.qryType = qryType;
+        this.cache = cache;
+        this.startTime = startTime;
+        this.cancel = cancel;
+        this.loc = loc;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public Long id() {
+        return id;
+    }
+
+    /**
+     * @return Query text.
+     */
+    public String query() {
+        return qry;
+    }
+
+    /**
+     * @return Query type.
+     */
+    public GridCacheQueryType queryType() {
+        return qryType;
+    }
+
+    /**
+     * @return Cache where query was executed.
+     */
+    public String cache() {
+        return cache;
+    }
+
+    /**
+     * @return Query start time.
+     */
+    public long startTime() {
+        return startTime;
+    }
+
+    /**
+     * @param curTime Current time.
+     * @param duration Duration of long query.
+     * @return {@code true} if this query should be considered as long running query.
+     */
+    public boolean longQuery(long curTime, long duration) {
+        return curTime - startTime > duration;
+    }
+
+    /**
+     * Cancel query.
+     */
+    public void cancel() {
+        if (cancel != null)
+            cancel.cancel();
+    }
+
+    /**
+     * @return {@code true} if query can be cancelled.
+     */
+    public boolean cancelable() {
+        return cancel != null;
+    }
+
+    /**
+     * @return {@code true} if query is local.
+     */
+    public boolean local() {
+        return loc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
index 57f1346..ece1a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
@@ -130,4 +130,4 @@ public abstract class VisorMultiNodeTask<A, R, J> implements ComputeTask<VisorTa
                 logFinish(ignite.log(), getClass(), start);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
new file mode 100644
index 0000000..a6f2d82
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorOneNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task to cancel queries.
+ */
+@GridInternal
+public class VisorCancelQueriesTask extends VisorOneNodeTask<Collection<Long>, Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorCancelQueriesJob job(Collection<Long> arg) {
+        return new VisorCancelQueriesJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected Void reduce0(List<ComputeJobResult> results) throws IgniteException {
+        return null;
+    }
+
+    /**
+     * Job to cancel queries on node.
+     */
+    private static class VisorCancelQueriesJob extends VisorJob<Collection<Long>, Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job with specified argument.
+         *
+         * @param arg Job argument.
+         * @param debug Flag indicating whether debug information should be printed into node log.
+         */
+        protected VisorCancelQueriesJob(@Nullable Collection<Long> arg, boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Void run(@Nullable Collection<Long> queries) throws IgniteException {
+            ignite.context().query().cancelQueries(queries);
+
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java
new file mode 100644
index 0000000..2b40e61
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task to collect currently running queries.
+ */
+@GridInternal
+public class VisorCollectRunningQueriesTask extends VisorMultiNodeTask<Long, Map<UUID, Collection<VisorRunningQuery>>, Collection<VisorRunningQuery>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorCollectRunningQueriesJob job(Long arg) {
+        return new VisorCollectRunningQueriesJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected Map<UUID, Collection<VisorRunningQuery>> reduce0(List<ComputeJobResult> results) throws IgniteException {
+        Map<UUID, Collection<VisorRunningQuery>> map = new HashMap<>();
+
+        for (ComputeJobResult res : results)
+            if (res.getException() == null) {
+                Collection<VisorRunningQuery> queries = res.getData();
+
+                map.put(res.getNode().id(), queries);
+            }
+
+        return map;
+    }
+
+    /**
+     * Job to collect currently running queries from node.
+     */
+    private static class VisorCollectRunningQueriesJob extends VisorJob<Long, Collection<VisorRunningQuery>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job with specified argument.
+         *
+         * @param arg Job argument.
+         * @param debug Flag indicating whether debug information should be printed into node log.
+         */
+        protected VisorCollectRunningQueriesJob(@Nullable Long arg, boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Collection<VisorRunningQuery> run(@Nullable Long duration) throws IgniteException {
+            Collection<GridRunningQueryInfo> queries = ignite.context().query()
+                .runningQueries(duration != null ? duration : 0);
+
+            Collection<VisorRunningQuery> res = new ArrayList<>(queries.size());
+
+            long curTime = U.currentTimeMillis();
+
+            for (GridRunningQueryInfo qry : queries)
+                res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(),
+                    qry.startTime(), curTime - qry.startTime(),
+                    qry.cancelable(), qry.local()));
+
+            return res;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
new file mode 100644
index 0000000..fc6bc7a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+
+/**
+ * Descriptor of running query.
+ */
+public class VisorRunningQuery implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long id;
+
+    /** Query text. */
+    private String qry;
+
+    /** Query type. */
+    private GridCacheQueryType qryType;
+
+    /** Cache name for query. */
+    private String cache;
+
+    /** */
+    private long startTime;
+
+    /** */
+    private long duration;
+
+    /** */
+    private boolean cancellable;
+
+    /** */
+    private boolean loc;
+
+    /**
+     * @param id Query ID.
+     * @param qry Query text.
+     * @param qryType Query type.
+     * @param cache Cache where query was executed.
+     * @param startTime Query start time.
+     * @param duration Query current duration.
+     * @param cancellable {@code true} if query can be canceled.
+     * @param loc {@code true} if query is local.
+     */
+    public VisorRunningQuery(long id, String qry, GridCacheQueryType qryType, String cache,
+        long startTime, long duration,
+        boolean cancellable, boolean loc) {
+        this.id = id;
+        this.qry = qry;
+        this.qryType = qryType;
+        this.cache = cache;
+        this.startTime = startTime;
+        this.duration = duration;
+        this.cancellable = cancellable;
+        this.loc = loc;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long getId() {
+        return id;
+    }
+
+    /**
+     * @return Query txt.
+     */
+    public String getQuery() {
+        return qry;
+    }
+
+    /**
+     * @return Query type.
+     */
+    public GridCacheQueryType getQueryType() {
+        return qryType;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String getCache() {
+        return cache;
+    }
+
+    /**
+     * @return Query start time.
+     */
+    public long getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * @return Query duration.
+     */
+    public long getDuration() {
+        return duration;
+    }
+
+    /**
+     * @return {@code true} if query can be cancelled.
+     */
+    public boolean isCancelable() {
+        return cancellable;
+    }
+
+    /**
+     * @return {@code true} if query is local.
+     */
+    public boolean isLocal() {
+        return loc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index cbf2ebd..62b47b8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -52,6 +52,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -79,6 +80,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -172,6 +174,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getString;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
@@ -279,9 +284,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final Map<String, String> space2schema = new ConcurrentHashMap8<>();
 
     /** */
+    private AtomicLong qryIdGen;
+
+    /** */
     private GridSpinBusyLock busyLock;
 
     /** */
+    private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap8<>();
+
+    /** */
     private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
         @Nullable @Override public ConnectionWrapper get() {
             ConnectionWrapper c = super.get();
@@ -751,8 +762,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         IndexingQueryFilter filters) throws IgniteCheckedException {
         TableDescriptor tbl = tableDescriptor(spaceName, type);
 
-        if (tbl != null && tbl.luceneIdx != null)
-            return tbl.luceneIdx.query(qry, filters);
+        if (tbl != null && tbl.luceneIdx != null) {
+            GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, spaceName,
+                U.currentTimeMillis(), null, true);
+
+            try {
+                runs.put(run.id(), run);
+
+                return tbl.luceneIdx.query(qry, filters);
+            }
+            finally {
+                runs.remove(run.id());
+            }
+        }
 
         return new GridEmptyCloseableIterator<>();
     }
@@ -796,6 +818,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 GridH2QueryContext.set(ctx);
 
+                GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS,
+                    spaceName, U.currentTimeMillis(), cancel, true);
+
+                runs.putIfAbsent(run.id(), run);
+
                 try {
                     ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
 
@@ -803,6 +830,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 }
                 finally {
                     GridH2QueryContext.clearThreadLocal();
+
+                    runs.remove(run.id());
                 }
             }
         };
@@ -1061,6 +1090,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
 
+        GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName,
+            U.currentTimeMillis(), null, true);
+
+        runs.put(run.id(), run);
+
         try {
             ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
 
@@ -1068,6 +1102,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
         finally {
             GridH2QueryContext.clearThreadLocal();
+
+            runs.remove(run.id());
         }
     }
 
@@ -1692,6 +1728,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         this.busyLock = busyLock;
 
+        qryIdGen = new AtomicLong();
+
         if (SysProperties.serializeJavaObject) {
             U.warn(log, "Serialization of Java objects in H2 was enabled.");
 
@@ -1742,7 +1780,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             marshaller = ctx.config().getMarshaller();
 
             mapQryExec = new GridMapQueryExecutor(busyLock);
-            rdcQryExec = new GridReduceQueryExecutor(busyLock);
+            rdcQryExec = new GridReduceQueryExecutor(qryIdGen, busyLock);
 
             mapQryExec.start(ctx, this);
             rdcQryExec.start(ctx, this);
@@ -2196,6 +2234,37 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         return cols;
     }
 
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
+        Collection<GridRunningQueryInfo> res = new ArrayList<>();
+
+        res.addAll(runs.values());
+        res.addAll(rdcQryExec.longRunningQueries(duration));
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelQueries(Collection<Long> queries) {
+        if (!F.isEmpty(queries)) {
+            for (Long qryId : queries) {
+                GridRunningQueryInfo run = runs.get(qryId);
+
+                if (run != null)
+                    run.cancel();
+            }
+
+            rdcQryExec.cancelQueries(queries);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelAllQueries() {
+        for (Connection conn : conns)
+            U.close(conn, log);
+    }
+
     /**
      * Wrapper to store connection and flag is schema set or not.
      */
@@ -3086,10 +3155,4 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             lastUsage = U.currentTimeMillis();
         }
     }
-
-    /** {@inheritDoc} */
-    @Override public void cancelAllQueries() {
-        for (Connection conn : conns)
-            U.close(conn, log);
-    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 7d43bf6..8284c45 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -170,7 +170,7 @@ public class GridSqlQuerySplitter {
         qry = collectAllTables(qry, schemas, tbls);
 
         // Build resulting two step query.
-        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(schemas, tbls);
+        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(qry.getSQL(), schemas, tbls);
 
         // Map query will be direct reference to the original query AST.
         // Thus all the modifications will be performed on the original AST, so we should be careful when
@@ -954,4 +954,4 @@ public class GridSqlQuerySplitter {
     private static GridSqlFunction function(GridSqlFunctionType type) {
         return new GridSqlFunction(type);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 40b11973..3f886ee 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
@@ -99,6 +100,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
 
 /**
@@ -121,7 +123,7 @@ public class GridReduceQueryExecutor {
     private IgniteLogger log;
 
     /** */
-    private final AtomicLong reqIdGen = new AtomicLong();
+    private final AtomicLong qryIdGen;
 
     /** */
     private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
@@ -168,9 +170,11 @@ public class GridReduceQueryExecutor {
     };
 
     /**
+     * @param qryIdGen Query ID generator.
      * @param busyLock Busy lock.
      */
-    public GridReduceQueryExecutor(GridSpinBusyLock busyLock) {
+    public GridReduceQueryExecutor(AtomicLong qryIdGen, GridSpinBusyLock busyLock) {
+        this.qryIdGen = qryIdGen;
         this.busyLock = busyLock;
     }
 
@@ -494,11 +498,13 @@ public class GridReduceQueryExecutor {
                 }
             }
 
-            final long qryReqId = reqIdGen.incrementAndGet();
+            final long qryReqId = qryIdGen.incrementAndGet();
 
             final String space = cctx.name();
 
-            final QueryRun r = new QueryRun(h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize());
+            final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), space,
+                h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize(),
+                U.currentTimeMillis(), cancel);
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
@@ -1304,10 +1310,46 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * Collect queries that already running more than specified duration.
+     *
+     * @param duration Duration to check.
+     * @return Collection of IDs and statements of long running queries.
+     */
+    public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
+        Collection<GridRunningQueryInfo> res = new ArrayList<>();
+
+        long curTime = U.currentTimeMillis();
+
+        for (QueryRun run : runs.values()) {
+            if (run.qry.longQuery(curTime, duration))
+                res.add(run.qry);
+        }
+
+        return res;
+    }
+
+    /**
+     * Cancel specified queries.
+     *
+     * @param queries Queries IDs to cancel.
+     */
+    public void cancelQueries(Collection<Long> queries) {
+        for (Long qryId : queries) {
+            QueryRun run = runs.get(qryId);
+
+            if (run != null)
+                run.qry.cancel();
+        }
+    }
+
+    /**
      * Query run.
      */
     private static class QueryRun {
         /** */
+        private final GridRunningQueryInfo qry;
+
+        /** */
         private final List<GridMergeIndex> idxs;
 
         /** */
@@ -1323,11 +1365,17 @@ public class GridReduceQueryExecutor {
         private final AtomicReference<Object> state = new AtomicReference<>();
 
         /**
+         * @param id Query ID.
+         * @param qry Query text.
+         * @param cache Cache where query was executed.
          * @param conn Connection.
          * @param idxsCnt Number of indexes.
          * @param pageSize Page size.
+         * @param startTime Start time.
+         * @param cancel Query cancel handler.
          */
-        private QueryRun(Connection conn, int idxsCnt, int pageSize) {
+        private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
+            this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false);
             this.conn = (JdbcConnection)conn;
             this.idxs = new ArrayList<>(idxsCnt);
             this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
@@ -1410,4 +1458,4 @@ public class GridReduceQueryExecutor {
             return copy(msg, n, partsMap);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
index e47e893..66e7e4a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
@@ -17,15 +17,23 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -54,6 +62,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
 
         cc.setCopyOnRead(true);
         cc.setIndexedTypes(Integer.class, Value.class);
+        cc.setSqlFunctionClasses(TestSQLFunctions.class);
 
         cfg.setCacheConfiguration(cc);
 
@@ -72,7 +81,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
         IgniteCache<Integer, Value> cache = grid(0).cache(null);
 
         for (int i = 0; i < KEYS; i++)
-            cache.put(i, new Value("before"));
+            cache.put(i, new Value(i, "before-" + i));
     }
 
     /** {@inheritDoc} */
@@ -195,17 +204,148 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
         check(cache);
     }
 
-    /** */
-    private static class Value {
-        /** */
-        private String str;
+    /**
+     * Run specified query in separate thread.
+     *
+     * @param qry Query to execute.
+     */
+    private IgniteInternalFuture<?> runQueryAsync(final Query<?> qry) throws Exception {
+        return multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    log.info(">>> Query started");
+
+                    grid(0).cache(null).query(qry).getAll();
+
+                    log.info(">>> Query finished");
+                }
+                catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 1, "run-query");
+    }
 
-        /**
-         * @param str String.
-         */
-        public Value(String str) {
-            this.str = str;
+    /**
+     * Test collecting info about running.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRunningSqlFieldsQuery() throws Exception {
+        IgniteInternalFuture<?> fut = runQueryAsync(new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3"));
+
+        Thread.sleep(500);
+
+        GridQueryProcessor qryProc = grid(0).context().query();
+
+        Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+        assertEquals(1, queries.size());
+
+        fut.get();
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(0, queries.size());
+
+        SqlFieldsQuery qry = new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3");
+        qry.setLocal(true);
+
+        fut = runQueryAsync(qry);
+
+        Thread.sleep(500);
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(1, queries.size());
+
+        fut.get();
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(0, queries.size());
+    }
+
+    /**
+     * Test collecting info about running.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRunningSqlQuery() throws Exception {
+        IgniteInternalFuture<?> fut = runQueryAsync(new SqlQuery<Integer, Value>(Value.class, "id > sleep(100)"));
+
+        Thread.sleep(500);
+
+        GridQueryProcessor qryProc = grid(0).context().query();
+
+        Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+        assertEquals(1, queries.size());
+
+        fut.get();
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(0, queries.size());
+
+        SqlQuery<Integer, Value> qry = new SqlQuery<>(Value.class, "id > sleep(100)");
+        qry.setLocal(true);
+
+        fut = runQueryAsync(qry);
+
+        Thread.sleep(500);
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(1, queries.size());
+
+        fut.get();
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(0, queries.size());
+    }
+
+    /**
+     * Test collecting info about running.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCancelingSqlFieldsQuery() throws Exception {
+        runQueryAsync(new SqlFieldsQuery("select * from (select _val, sleep(100) from Value limit 50)"));
+
+        Thread.sleep(500);
+
+        final GridQueryProcessor qryProc = grid(0).context().query();
+
+        Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+        assertEquals(1, queries.size());
+
+        final Collection<GridRunningQueryInfo> finalQueries = queries;
+
+        for (GridRunningQueryInfo query : finalQueries)
+            qryProc.cancelQueries(Collections.singleton(query.id()));
+
+        int n = 100;
+
+        // Give cluster some time to cancel query and cleanup resources.
+        while (n > 0) {
+            Thread.sleep(100);
+
+            queries = qryProc.runningQueries(0);
+
+            if (queries.isEmpty())
+                break;
+
+            log.info(">>>> Wait for cancel: " + n);
+
+            n--;
         }
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(0, queries.size());
     }
 
     /**
@@ -218,9 +358,53 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
         for (Cache.Entry<Integer, Value> entry : cache) {
             cnt++;
 
-            assertEquals("before", entry.getValue().str);
+            assertEquals("before-" + entry.getKey(), entry.getValue().str);
         }
 
         assertEquals(KEYS, cnt);
     }
-}
\ No newline at end of file
+
+    /** */
+    private static class Value {
+        /** */
+        @QuerySqlField
+        private int id;
+
+        /** */
+        @QuerySqlField
+        private String str;
+
+        /**
+         * @param id ID.
+         * @param str String.
+         */
+        public Value(int id, String str) {
+            this.id = id;
+            this.str = str;
+        }
+    }
+
+    /**
+     * Utility class with custom SQL functions.
+     */
+    public static class TestSQLFunctions {
+        /**
+         * Sleep function to simulate long running queries.
+         *
+         * @param x Time to sleep.
+         * @return Return specified argument.
+         */
+        @QuerySqlFunction
+        public static long sleep(long x) {
+            if (x >= 0)
+                try {
+                    Thread.sleep(x);
+                }
+                catch (InterruptedException ignored) {
+                    // No-op.
+                }
+
+            return x;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 1f10593..01fefa3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -477,4 +477,4 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
             return storeId;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index ad8a7e3..814d0e0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -106,6 +106,9 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         spi.registerCache(null, cacheCfg("B"));
     }
 
+    /**
+     * @param name Name.
+     */
     private CacheConfiguration cacheCfg(String name) {
         CacheConfiguration<?,?> cfg = new CacheConfiguration<>();
 
@@ -114,6 +117,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         return cfg;
     }
 
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         idx.stop();
 
@@ -182,6 +186,9 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         return idx;
     }
 
+    /**
+     * @return {@code true} if OFF-HEAP mode should be tested.
+     */
     protected boolean offheap() {
         return false;
     }


[08/13] ignite git commit: IGNITE-4740 - Fix. Service could be deployed/undeployed twice on concurrent cancel and discovery event.

Posted by nt...@apache.org.
IGNITE-4740 - Fix. Service could be deployed/undeployed twice on concurrent cancel and discovery event.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fcb3e74
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fcb3e74
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fcb3e74

Branch: refs/heads/master
Commit: 9fcb3e74f91c8497b7b1358cdff40950cdf5c568
Parents: c0e2df2
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Feb 28 16:05:06 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Feb 28 16:05:06 2017 +0300

----------------------------------------------------------------------
 .../cache/DynamicCacheChangeBatch.java          |  14 ++
 .../service/GridServiceProcessor.java           |  49 +++---
 .../GridServiceContinuousQueryRedeploy.java     | 167 +++++++++++++++++++
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 4 files changed, 208 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 4dcff9b..a250063 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -113,6 +113,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
         return clientReconnect;
     }
 
+    /**
+     * @return {@code True} if request should trigger partition exchange.
+     */
+    public boolean exchangeNeeded() {
+        if (reqs != null) {
+            for (DynamicCacheChangeRequest req : reqs) {
+                if (req.exchangeNeeded())
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheChangeBatch.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 3690f35..4eeafed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -65,10 +65,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.continuous.AbstractContinuousMessage;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridEmptyIterator;
@@ -1468,19 +1470,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         else {
             String name = e.getKey().name();
 
-            svcName.set(name);
-
-            Collection<ServiceContextImpl> ctxs;
-
-            synchronized (locSvcs) {
-                ctxs = locSvcs.remove(name);
-            }
-
-            if (ctxs != null) {
-                synchronized (ctxs) {
-                    cancel(ctxs, ctxs.size());
-                }
-            }
+            undeploy(name);
 
             // Finish deployment futures if undeployment happened.
             GridFutureAdapter<?> fut = depFuts.remove(name);
@@ -1586,6 +1576,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                         if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
                             return;
                     }
+                    else if (msg instanceof DynamicCacheChangeBatch) {
+                        if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
+                            return;
+                    }
+                    else
+                        return;
                 }
                 else
                     topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
@@ -1771,21 +1767,26 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             }
         }
         // Handle undeployment.
-        else {
-            String name = e.getKey().name();
+        else
+            undeploy(e.getKey().name());
+    }
 
-            svcName.set(name);
 
-            Collection<ServiceContextImpl> ctxs;
+    /**
+     * @param name Name.
+     */
+    private void undeploy(String name) {
+        svcName.set(name);
 
-            synchronized (locSvcs) {
-                ctxs = locSvcs.remove(name);
-            }
+        Collection<ServiceContextImpl> ctxs;
 
-            if (ctxs != null) {
-                synchronized (ctxs) {
-                    cancel(ctxs, ctxs.size());
-                }
+        synchronized (locSvcs) {
+            ctxs = locSvcs.remove(name);
+        }
+
+        if (ctxs != null) {
+            synchronized (ctxs) {
+                cancel(ctxs, ctxs.size());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
new file mode 100644
index 0000000..1a9ef3a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests whether concurrent service cancel and registering ContinuousQuery doesn't causes
+ * service redeployment.
+ */
+public class GridServiceContinuousQueryRedeploy extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME = "TEST_CACHE";
+
+    /** */
+    private static final String TEST_KEY = "TEST_KEY";
+
+    /** */
+    private static final String SERVICE_NAME = "service1";
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServiceRedeploymentAfterCancel() throws Exception {
+        final Ignite ignite = startGrid(0);
+
+        final IgniteCache<Object, Object> managementCache = ignite.getOrCreateCache(CACHE_NAME);
+
+        final ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+        final List<Object> evts = Collections.synchronizedList(new ArrayList<>());
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(
+                Iterable<CacheEntryEvent<?, ?>> iterable) throws CacheEntryListenerException {
+                for (CacheEntryEvent<?, ?> event : iterable)
+                    evts.add(event);
+            }
+        });
+
+        int iterations = 100;
+
+        while (iterations-- > 0) {
+            QueryCursor quorumCursor = managementCache.query(qry);
+
+            IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    System.out.println("Deploy " + SERVICE_NAME);
+                    deployService(ignite);
+
+                    return null;
+                }
+            });
+
+            IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    System.out.println("Undeploy " + SERVICE_NAME);
+                    ignite.services().cancel(SERVICE_NAME);
+
+                    return null;
+                }
+            });
+
+            fut1.get();
+            fut2.get();
+
+            U.sleep(100);
+
+            assert evts.size() <= 1 : evts.size();
+
+            ignite.services().cancel("service1");
+
+            evts.clear();
+
+            quorumCursor.close();
+        }
+
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    private void deployService(final Ignite ignite) {
+        ServiceConfiguration svcCfg = new ServiceConfiguration();
+
+        svcCfg.setService(new ManagementService());
+        svcCfg.setName(SERVICE_NAME);
+        svcCfg.setTotalCount(1);
+        svcCfg.setMaxPerNodeCount(1);
+        svcCfg.setNodeFilter(new IgnitePredicate<ClusterNode>() {
+            @Override public boolean apply(ClusterNode node) {
+                return !node.isClient();
+            }
+        });
+
+        ignite.services().deploy(svcCfg);
+    }
+
+    /**
+     *
+     */
+    public static class ManagementService implements Service {
+        /** */
+        private final String name = UUID.randomUUID().toString();
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void cancel(ServiceContext ctx) {
+            System.out.println(name + " shutdown.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void init(ServiceContext ctx) throws Exception {
+            System.out.println(name + " initializing.");
+
+            ignite.cache(CACHE_NAME).put(TEST_KEY, name + " init");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(ServiceContext ctx) throws Exception {
+            // No-op
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 350b715..5977702 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cluster.GridAddressResolverSelfTest
 import org.apache.ignite.internal.processors.cluster.GridUpdateNotifierSelfTest;
 import org.apache.ignite.internal.processors.port.GridPortProcessorSelfTest;
 import org.apache.ignite.internal.processors.service.GridServiceClientNodeTest;
+import org.apache.ignite.internal.processors.service.GridServiceContinuousQueryRedeploy;
 import org.apache.ignite.internal.processors.service.GridServicePackagePrivateSelfTest;
 import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeConfigSelfTest;
 import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeSelfTest;
@@ -143,6 +144,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class);
         suite.addTestSuite(IgniteServiceReassignmentTest.class);
         suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class);
+        suite.addTestSuite(GridServiceContinuousQueryRedeploy.class);
 
         suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
         suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);