You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2017/03/16 13:36:40 UTC
[01/13] ignite git commit: IGNITE-4641 - Refresh client attributes
during reconnect
Repository: ignite
Updated Branches:
refs/heads/master 94c1e7cb5 -> 84880a810
IGNITE-4641 - Refresh client attributes during reconnect
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8874f99f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8874f99f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8874f99f
Branch: refs/heads/master
Commit: 8874f99f44dc2edf08a525619edb49d5db70b938
Parents: 0a43665
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Feb 14 18:44:57 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Feb 14 18:44:57 2017 +0300
----------------------------------------------------------------------
.../internal/managers/GridManagerAdapter.java | 4 +
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 5 +
.../org/apache/ignite/spi/IgniteSpiContext.java | 6 +
.../ignite/spi/discovery/tcp/ClientImpl.java | 2 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 5 +
.../tcp/internal/TcpDiscoveryNode.java | 6 +-
...veryNodeAttributesUpdateOnReconnectTest.java | 110 ++++++++++++++++++
.../tcp/TestReconnectPluginProvider.java | 111 +++++++++++++++++++
.../discovery/tcp/TestReconnectProcessor.java | 93 ++++++++++++++++
.../testframework/GridSpiTestContext.java | 5 +
.../IgniteSpiDiscoverySelfTestSuite.java | 3 +
.../org.apache.ignite.plugin.PluginProvider | 1 +
parent/pom.xml | 1 +
13 files changed, 349 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 584cc56..25cc715 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -569,6 +569,10 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
}
+ @Override public Map<String, Object> nodeAttributes() {
+ return ctx.nodeAttributes();
+ }
+
/**
* @param e Exception to handle.
* @return GridSpiException Converted exception.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 219d07b..8879364 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -928,5 +928,10 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
}
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> nodeAttributes() {
+ return Collections.emptyMap();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 5eb5227..96b3e61 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi;
import java.io.Serializable;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import javax.cache.CacheException;
import org.apache.ignite.IgniteException;
@@ -352,4 +353,9 @@ public interface IgniteSpiContext {
* @param c Timeout object.
*/
public void removeTimeoutObject(IgniteSpiTimeoutObject c);
+
+ /**
+ * @return Current node attributes.
+ */
+ public Map<String, Object> nodeAttributes();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 39c539c..932e7d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -628,7 +628,7 @@ class ClientImpl extends TcpDiscoveryImpl {
TcpDiscoveryNode node = locNode;
if (locNode.order() > 0)
- node = locNode.clientReconnectNode();
+ node = locNode.clientReconnectNode(spi.spiCtx.nodeAttributes());
msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 45933e1..00ae97d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -409,6 +409,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** */
private boolean clientReconnectDisabled;
+ /** */
+ protected IgniteSpiContext spiCtx;
+
/** {@inheritDoc} */
@Override public String getSpiState() {
return impl.getSpiState();
@@ -1161,6 +1164,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
@Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
super.onContextInitialized0(spiCtx);
+ this.spiCtx = spiCtx;
+
ctxInitLatch.countDown();
ipFinder.onSpiContextInitialized(spiCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 307aefe..d8b1fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -513,13 +514,14 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
}
/**
+ * @param nodeAttrs Current node attributes.
* @return Copy of local node for client reconnect request.
*/
- public TcpDiscoveryNode clientReconnectNode() {
+ public TcpDiscoveryNode clientReconnectNode(Map<String, Object> nodeAttrs) {
TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver,
null);
- node.attrs = attrs;
+ node.attrs = Collections.unmodifiableMap(new HashMap<>(nodeAttrs));
node.clientRouterNodeId = clientRouterNodeId;
return node;
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
new file mode 100644
index 0000000..56dc4ec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.reconnectClientNode;
+
+/**
+ * Checks whether on client reconnect node attributes from kernal context are sent.
+ */
+public class TcpDiscoveryNodeAttributesUpdateOnReconnectTest extends GridCommonAbstractTest {
+ /** */
+ private volatile String rejoinAttr;
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (gridName.contains("client")) {
+ Map<String, String> attrs = new HashMap<>();
+
+ attrs.put("test", "1");
+
+ cfg.setUserAttributes(attrs);
+ cfg.setClientMode(true);
+ }
+
+ IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi spi = new IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi();
+
+ TcpDiscoveryIpFinder finder = ((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder();
+
+ spi.setIpFinder(finder);
+
+ cfg.setDiscoverySpi(spi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ TestReconnectPluginProvider.enabled = false;
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ TestReconnectPluginProvider.enabled = true;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnect() throws Exception {
+ Ignite srv = startGrid("server");
+
+ IgniteEvents evts = srv.events();
+
+ evts.enableLocal(EventType.EVTS_DISCOVERY_ALL);
+ evts.localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ ClusterNode node = ((DiscoveryEvent)evt).eventNode();
+
+ rejoinAttr = node.attribute("test");
+
+ return true;
+ }
+ }, EventType.EVT_NODE_JOINED);
+
+ Ignite client = startGrid("client");
+
+ reconnectClientNode(log, client, srv, null);
+
+ assertEquals("2", rejoinAttr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectPluginProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectPluginProvider.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectPluginProvider.java
new file mode 100644
index 0000000..692774c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectPluginProvider.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Creates TestReconnectProcessor.
+ */
+public class TestReconnectPluginProvider implements PluginProvider {
+ /** */
+ private GridKernalContext igniteCtx;
+
+ /** */
+ public static volatile boolean enabled;
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "TestReconnectPlugin";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return "1.0";
+ }
+
+ /** {@inheritDoc} */
+ @Override public String copyright() {
+ return "";
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
+ igniteCtx = ((IgniteKernal)ctx.grid()).context();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(PluginContext ctx) throws IgniteCheckedException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStart() throws IgniteCheckedException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStop(boolean cancel) {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object createComponent(PluginContext ctx, Class cls) {
+ if (enabled && GridSecurityProcessor.class.equals(cls))
+ return new TestReconnectProcessor(igniteCtx);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgnitePlugin plugin() {
+ return new IgnitePlugin() {};
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectProcessor.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectProcessor.java
new file mode 100644
index 0000000..f0ed35c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestReconnectProcessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
+import org.apache.ignite.internal.processors.security.SecurityContext;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.plugin.security.AuthenticationContext;
+import org.apache.ignite.plugin.security.SecurityCredentials;
+import org.apache.ignite.plugin.security.SecurityException;
+import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.plugin.security.SecuritySubject;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Updates node attributes on disconnect.
+ */
+public class TestReconnectProcessor extends GridProcessorAdapter implements GridSecurityProcessor {
+ /**
+ * @param ctx Kernal context.
+ */
+ protected TestReconnectProcessor(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public SecurityContext authenticateNode(ClusterNode node,
+ SecurityCredentials cred) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isGlobalNodeAuthentication() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SecurityContext authenticate(AuthenticationContext ctx) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<SecuritySubject> authenticatedSubjects() throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SecuritySubject authenticatedSubject(UUID subjId) throws IgniteCheckedException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void authorize(String name, SecurityPermission perm,
+ @Nullable SecurityContext securityCtx) throws SecurityException {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionExpired(UUID subjId) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean enabled() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ ctx.addNodeAttribute("test", "2");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index ac50ef9..1c8acbc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -601,6 +601,11 @@ public class GridSpiTestContext implements IgniteSpiContext {
timeoutProcessor.removeTimeoutObject(new GridSpiTimeoutObject(obj));
}
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> nodeAttributes() {
+ return Collections.emptyMap();
+ }
+
/**
* @param cacheName Cache name.
* @return Map representing cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 5f870a4..548e1a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMarshallerCheckSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnReconnectTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest;
@@ -84,6 +85,8 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpDiscoveryRestartTest.class));
suite.addTest(new TestSuite(TcpDiscoveryMultiThreadedTest.class));
+ suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class));
+
// SSL.
suite.addTest(new TestSuite(TcpDiscoverySslSelfTest.class));
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/modules/core/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
----------------------------------------------------------------------
diff --git a/modules/core/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
new file mode 100644
index 0000000..a7fdf43
--- /dev/null
+++ b/modules/core/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider
@@ -0,0 +1 @@
+org.apache.ignite.spi.discovery.tcp.TestReconnectPluginProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8874f99f/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 88532a7..8bf5dde 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -821,6 +821,7 @@
<exclude>src/main/java/META-INF/services/javax.cache.spi.CachingProvider</exclude><!--cannot be changed-->
<exclude>src/main/java/org/jetbrains/annotations/*.java</exclude><!--copyright-->
<exclude>src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider</exclude><!--cannot be changed-->
+ <exclude>/src/test/resources/META-INF/services/org.apache.ignite.plugin.PluginProvider</exclude>
<exclude>dev-tools/IGNITE-*.patch</exclude>
<exclude>dev-tools/.gradle/**/*</exclude>
<exclude>dev-tools/gradle/wrapper/**/*</exclude>
[10/13] ignite git commit: IGNITE-4717 Fixed hangs in
VisorCacheClearTask. (cherry picked from commit 76f3060)
Posted by nt...@apache.org.
IGNITE-4717 Fixed hangs in VisorCacheClearTask.
(cherry picked from commit 76f3060)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a30183ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a30183ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a30183ac
Branch: refs/heads/master
Commit: a30183ac821507fbdaa6f0cc2c6ef25ca2677867
Parents: 5736247
Author: Andrey Novikov <an...@gridgain.com>
Authored: Mon Feb 20 18:23:33 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Mar 2 10:47:15 2017 +0700
----------------------------------------------------------------------
.../visor/cache/VisorCacheClearTask.java | 88 +++++---------------
.../visor/compute/VisorGatewayTask.java | 30 ++++++-
2 files changed, 49 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a30183ac/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index 1f1a6fb..0c8476f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.visor.cache;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -26,7 +25,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.JobContextResource;
@@ -90,17 +88,11 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
}
/**
- * @param subJob Sub job to execute asynchronously.
+ * @param fut Future for asynchronous cache operation.
* @param idx Index.
* @return {@code true} If subJob was not completed and this job should be suspended.
*/
- private boolean callAsync(IgniteCallable<Integer> subJob, int idx) {
- IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName)).withAsync();
-
- compute.call(subJob);
-
- IgniteFuture<Integer> fut = compute.future();
-
+ private boolean callAsync(IgniteFuture<Integer> fut, int idx) {
futs[idx] = fut;
if (fut.isDone())
@@ -119,16 +111,28 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
futs = new IgniteFuture[3];
if (futs[0] == null || futs[1] == null || futs[2] == null) {
- IgniteCache cache = ignite.cache(cacheName);
+ IgniteCache cache = ignite.cache(cacheName).withAsync();
+
+ if (futs[0] == null) {
+ cache.size(CachePeekMode.PRIMARY);
+
+ if (callAsync(cache.<Integer>future(), 0))
+ return null;
+ }
- if (futs[0] == null && callAsync(new VisorCacheSizeCallable(cache), 0))
- return null;
+ if (futs[1] == null) {
+ cache.clear();
- if (futs[1] == null && callAsync(new VisorCacheClearCallable(cache), 1))
- return null;
+ if (callAsync(cache.<Integer>future(), 1))
+ return null;
+ }
+
+ if (futs[2] == null) {
+ cache.size(CachePeekMode.PRIMARY);
- if (futs[2] == null && callAsync(new VisorCacheSizeCallable(cache), 2))
- return null;
+ if (callAsync(cache.<Integer>future(), 2))
+ return null;
+ }
}
assert futs[0].isDone() && futs[1].isDone() && futs[2].isDone();
@@ -141,54 +145,4 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
return S.toString(VisorCacheClearJob.class, this);
}
}
-
- /**
- * Callable to get cache size.
- */
- @GridInternal
- private static class VisorCacheSizeCallable implements IgniteCallable<Integer> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteCache cache;
-
- /**
- * @param cache Cache to take size from.
- */
- private VisorCacheSizeCallable(IgniteCache cache) {
- this.cache = cache;
- }
-
- /** {@inheritDoc} */
- @Override public Integer call() throws Exception {
- return cache.size(CachePeekMode.PRIMARY);
- }
- }
-
- /**
- * Callable to clear cache.
- */
- @GridInternal
- private static class VisorCacheClearCallable implements IgniteCallable<Integer> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteCache cache;
-
- /**
- * @param cache Cache to clear.
- */
- private VisorCacheClearCallable(IgniteCache cache) {
- this.cache = cache;
- }
-
- /** {@inheritDoc} */
- @Override public Integer call() throws Exception {
- cache.clear();
-
- return 0;
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a30183ac/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
index 2539a26..a64ec6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java
@@ -29,21 +29,26 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
import org.jetbrains.annotations.Nullable;
/**
@@ -101,9 +106,16 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
@IgniteInstanceResource
protected transient IgniteEx ignite;
+ /** Auto-inject job context. */
+ @JobContextResource
+ protected transient ComputeJobContext jobCtx;
+
/** Arguments count. */
private final int argsCnt;
+ /** Future for spawned task. */
+ private transient IgniteFuture fut;
+
/**
* Create job with specified argument.
*
@@ -284,6 +296,9 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Object execute() throws IgniteException {
+ if (fut != null)
+ return fut.get();
+
String nidsArg = argument(0);
String taskName = argument(1);
@@ -355,8 +370,19 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> {
}
}
- return ignite.compute(ignite.cluster().forNodeIds(nids))
- .execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+ IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids)).withAsync();
+
+ comp.execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false));
+
+ fut = comp.future();
+
+ fut.listen(new CI1<IgniteFuture<Object>>() {
+ @Override public void apply(IgniteFuture<Object> f) {
+ jobCtx.callcc();
+ }
+ });
+
+ return jobCtx.holdcc();
}
}
}
[12/13] ignite git commit: IGNITE-4717 VisorClearTask minor fix.
(cherry picked from commit d4b87f4)
Posted by nt...@apache.org.
IGNITE-4717 VisorClearTask minor fix.
(cherry picked from commit d4b87f4)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bcb13982
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bcb13982
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bcb13982
Branch: refs/heads/master
Commit: bcb139822afa148a7ea3fbb3eecc274f308070f6
Parents: 7ad8e79
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Mar 10 15:51:38 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Mar 10 16:01:45 2017 +0700
----------------------------------------------------------------------
.../visor/cache/VisorCacheClearTask.java | 57 +++++++++++++++++++-
1 file changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb13982/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
index 0c8476f..ce74f17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.JobContextResource;
@@ -145,4 +146,58 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple<
return S.toString(VisorCacheClearJob.class, this);
}
}
-}
\ No newline at end of file
+
+ /**
+ * Callable to get cache size.
+ *
+ * @deprecated This class needed only for compatibility.
+ */
+ @GridInternal @Deprecated
+ private static class VisorCacheSizeCallable implements IgniteCallable<Integer> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteCache cache;
+
+ /**
+ * @param cache Cache to take size from.
+ */
+ private VisorCacheSizeCallable(IgniteCache cache) {
+ this.cache = cache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ return cache.size(CachePeekMode.PRIMARY);
+ }
+ }
+
+ /**
+ * Callable to clear cache.
+ *
+ * @deprecated This class needed only for compatibility.
+ */
+ @GridInternal @Deprecated
+ private static class VisorCacheClearCallable implements IgniteCallable<Integer> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteCache cache;
+
+ /**
+ * @param cache Cache to clear.
+ */
+ private VisorCacheClearCallable(IgniteCache cache) {
+ this.cache = cache;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ cache.clear();
+
+ return 0;
+ }
+ }
+}
[02/13] ignite git commit: IGNITE-3429: Added BinaryResolver
configuration samples for org.hibernate.cache.spi.CacheKey. This closes
#1516.
Posted by nt...@apache.org.
IGNITE-3429: Added BinaryResolver configuration samples for org.hibernate.cache.spi.CacheKey. This closes #1516.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05788b31
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05788b31
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05788b31
Branch: refs/heads/master
Commit: 05788b3188b30b5a3b39a75fe66301e03658408f
Parents: 8874f99
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Feb 17 12:14:53 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Fri Feb 17 12:14:53 2017 +0300
----------------------------------------------------------------------
.../Hibernate5CacheKeyTypeConfiguration.java | 52 ++++++++++++++++++++
.../HibernateCacheKeyTypeConfiguration.java | 51 +++++++++++++++++++
2 files changed, 103 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/05788b31/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
new file mode 100644
index 0000000..886f69b
--- /dev/null
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.hibernate.config;
+
+import java.util.Objects;
+import org.apache.ignite.binary.BinaryAbstractIdentityResolver;
+import org.apache.ignite.binary.BinaryIdentityResolver;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+
+/**
+ * This configuration provides correct {@link BinaryIdentityResolver} implementation
+ * for Hibernate CacheKey class can be used as a key object.
+ *
+ * Note: for Hibernate version < 5.0 {@link HibernateCacheKeyTypeConfiguration} should be used.
+
+ */
+public class Hibernate5CacheKeyTypeConfiguration extends BinaryTypeConfiguration {
+
+ /** {@inheritDoc} */
+ public Hibernate5CacheKeyTypeConfiguration() {
+ super("org.hibernate.cache.internal.CacheKeyImplementation");
+
+ setIdentityResolver(new BinaryAbstractIdentityResolver() {
+ @Override protected int hashCode0(BinaryObject obj) {
+ return obj.field("id").hashCode();
+ }
+
+ @Override protected boolean equals0(BinaryObject o1, BinaryObject o2) {
+ Object obj0 = o1.field("id");
+ Object obj1 = o2.field("id");
+
+ return Objects.equals(obj0, obj1);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05788b31/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
new file mode 100644
index 0000000..c54292e
--- /dev/null
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.hibernate.config;
+
+import java.util.Objects;
+import org.apache.ignite.binary.BinaryAbstractIdentityResolver;
+import org.apache.ignite.binary.BinaryIdentityResolver;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+
+/**
+ * This configuration provides correct {@link BinaryIdentityResolver} implementation
+ * for Hibernate CacheKey class can be used as a key object.
+ *
+ * Note: for Hibernate version >= 5.0 {@link Hibernate5CacheKeyTypeConfiguration} should be used.
+ */
+public class HibernateCacheKeyTypeConfiguration extends BinaryTypeConfiguration {
+
+ /** {@inheritDoc} */
+ public HibernateCacheKeyTypeConfiguration() {
+ super("org.hibernate.cache.spi.CacheKey");
+
+ setIdentityResolver(new BinaryAbstractIdentityResolver() {
+ @Override protected int hashCode0(BinaryObject obj) {
+ return obj.field("key").hashCode();
+ }
+
+ @Override protected boolean equals0(BinaryObject o1, BinaryObject o2) {
+ Object obj0 = o1.field("key");
+ Object obj1 = o2.field("key");
+
+ return Objects.equals(obj0, obj1);
+ }
+ });
+ }
+}
[04/13] ignite git commit: Merge remote-tracking branch
'origin/ignite-1.7.8' into ignite-1.7.8
Posted by nt...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.7.8' into ignite-1.7.8
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/382fbc9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/382fbc9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/382fbc9d
Branch: refs/heads/master
Commit: 382fbc9d0b55f794eb4a9045fe72ca06b480062f
Parents: 1f881aa 05788b3
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Feb 17 12:35:18 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Feb 17 12:35:18 2017 +0300
----------------------------------------------------------------------
.../Hibernate5CacheKeyTypeConfiguration.java | 52 ++++++++++++++++++++
.../HibernateCacheKeyTypeConfiguration.java | 51 +++++++++++++++++++
2 files changed, 103 insertions(+)
----------------------------------------------------------------------
[03/13] ignite git commit: IGNITE-4147 - Throw exception on
connecting node to cluster with different SSL configuration
Posted by nt...@apache.org.
IGNITE-4147 - Throw exception on connecting node to cluster with different SSL configuration
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f881aa7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f881aa7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f881aa7
Branch: refs/heads/master
Commit: 1f881aa70a3894af01135f4cc5e341a8130462c2
Parents: 8874f99
Author: dkarachentsev <dk...@gridgain.com>
Authored: Fri Feb 17 12:34:41 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Fri Feb 17 12:34:41 2017 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ClientImpl.java | 30 ++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 19 ++
.../TcpDiscoverySslSecuredUnsecuredTest.java | 185 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
4 files changed, 235 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f881aa7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 932e7d1..95e2cda 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.StreamCorruptedException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -44,6 +45,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ssl.SSLException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
@@ -587,6 +589,8 @@ class ClientImpl extends TcpDiscoveryImpl {
int connectAttempts = 1;
+ int sslConnectAttempts = 3;
+
UUID locNodeId = getLocalNodeId();
IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -662,6 +666,22 @@ class ClientImpl extends TcpDiscoveryImpl {
errs.add(e);
+ if (X.hasCause(e, SSLException.class)) {
+ if (--sslConnectAttempts == 0)
+ throw new IgniteSpiException("Unable to establish secure connection. " +
+ "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+
+ continue;
+ }
+
+ if (X.hasCause(e, StreamCorruptedException.class)) {
+ if (--sslConnectAttempts == 0)
+ throw new IgniteSpiException("Unable to establish plain connection. " +
+ "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+
+ continue;
+ }
+
if (timeoutHelper.checkFailureTimeoutReached(e))
break;
@@ -1593,7 +1613,15 @@ class ClientImpl extends TcpDiscoveryImpl {
joinCnt++;
- T2<SocketStream, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
+ T2<SocketStream, Boolean> joinRes;
+ try {
+ joinRes = joinTopology(false, spi.joinTimeout);
+ }
+ catch (IgniteSpiException e) {
+ joinError(e);
+
+ return;
+ }
if (joinRes == null) {
if (join)
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f881aa7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d462ac2..4600be0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
+import java.io.StreamCorruptedException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -1109,6 +1110,8 @@ class ServerImpl extends TcpDiscoveryImpl {
int connectAttempts = 1;
+ int sslConnectAttempts = 3;
+
boolean joinReqSent;
UUID locNodeId = getLocalNodeId();
@@ -1220,6 +1223,22 @@ class ServerImpl extends TcpDiscoveryImpl {
errs.add(e);
+ if (X.hasCause(e, SSLException.class)) {
+ if (--sslConnectAttempts == 0)
+ throw new IgniteException("Unable to establish secure connection. " +
+ "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+
+ continue;
+ }
+
+ if (X.hasCause(e, StreamCorruptedException.class)) {
+ if (--sslConnectAttempts == 0)
+ throw new IgniteException("Unable to establish plain connection. " +
+ "Was remote cluster configured with SSL? [rmtAddr=" + addr + ", errMsg=\"" + e.getMessage() + "\"]", e);
+
+ continue;
+ }
+
if (timeoutHelper.checkFailureTimeoutReached(e))
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f881aa7/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
new file mode 100644
index 0000000..ca34f77
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySslSecuredUnsecuredTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StreamCorruptedException;
+import java.net.Socket;
+import java.util.concurrent.Callable;
+import javax.net.ssl.SSLException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Tests cases when node connects to cluster with different SSL configuration.
+ * Exception with meaningful message should be thrown.
+ */
+public class TcpDiscoverySslSecuredUnsecuredTest extends GridCommonAbstractTest {
+ /** */
+ private volatile TcpDiscoverySpi spi;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setClientMode(gridName.contains("client"));
+
+ if (gridName.contains("ssl"))
+ cfg.setSslContextFactory(GridTestUtils.sslFactory());
+
+ if (spi != null) {
+ final TcpDiscoveryIpFinder finder = ((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder();
+
+ spi.setIpFinder(finder);
+
+ cfg.setDiscoverySpi(spi);
+ }
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSecuredUnsecuredServerConnection() throws Exception {
+ checkConnection("plain-server", "ssl-server");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUnsecuredSecuredServerConnection() throws Exception {
+ checkConnection("ssl-server", "plain-server");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSecuredClientUnsecuredServerConnection() throws Exception {
+ checkConnection("plain-server", "ssl-client");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUnsecuredClientSecuredServerConnection() throws Exception {
+ checkConnection("ssl-server", "plain-client");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPlainServerNodesRestart() throws Exception {
+ checkNodesRestart("plain-server-1", "plain-server-2");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSslServerNodesRestart() throws Exception {
+ checkNodesRestart("ssl-server-1", "ssl-server-2");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPlainClientNodesRestart() throws Exception {
+ checkNodesRestart("plain-server", "plain-client");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSslClientNodesRestart() throws Exception {
+ checkNodesRestart("ssl-server", "ssl-client");
+ }
+
+ /**
+ * @param name1 First grid name.
+ * @param name2 Second grid name.
+ * @throws Exception If failed.
+ */
+ private void checkNodesRestart(String name1, String name2) throws Exception {
+ startGrid(name1);
+
+ spi = new FailDiscoverySpi(!name1.contains("ssl"));
+
+ startGrid(name2);
+ }
+
+ /**
+ * @param name1 First grid name.
+ * @param name2 Second grid name.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ThrowableNotThrown")
+ private void checkConnection(final String name1, final String name2) throws Exception {
+ startGrid(name1);
+
+ GridTestUtils.assertThrows(null, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ startGrid(name2);
+
+ return null;
+ }
+ }, IgniteCheckedException.class, null);
+ }
+
+ /**
+ *
+ */
+ private class FailDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private int cnt = 1;
+
+ /** */
+ private final boolean plain;
+
+ /**
+ * @param plain Plain conection flag.
+ */
+ private FailDiscoverySpi(final boolean plain) {
+ this.plain = plain;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected <T> T readMessage(final Socket sock, @Nullable final InputStream in,
+ final long timeout) throws IOException, IgniteCheckedException {
+ if (cnt-- > 0) {
+ if (plain)
+ throw new StreamCorruptedException("Test exception");
+ else
+ throw new SSLException("Test SSL exception");
+ }
+
+ return super.readMessage(sock, in, timeout);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f881aa7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 548e1a5..e6b39f7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiConfigSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiFailureTimeoutSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpiStartStopSelfTest;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSecuredUnsecuredTest;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSelfTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinderSelfTest;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest;
@@ -89,6 +90,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
// SSL.
suite.addTest(new TestSuite(TcpDiscoverySslSelfTest.class));
+ suite.addTest(new TestSuite(TcpDiscoverySslSecuredUnsecuredTest.class));
return suite;
}
[06/13] ignite git commit: IGNITE-4624: Scan query optimization. This
closes #1509.
Posted by nt...@apache.org.
IGNITE-4624: Scan query optimization. This closes #1509.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f57760d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f57760d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f57760d
Branch: refs/heads/master
Commit: 2f57760dbb4fba948cd035498d2c7f71869c0665
Parents: 11bbec4
Author: Andrey V. Mashenkov <an...@gmail.com>
Authored: Fri Feb 17 16:15:31 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Fri Feb 17 18:47:21 2017 +0300
----------------------------------------------------------------------
.../distributed/dht/GridDhtCacheAdapter.java | 19 +++-
.../cache/query/GridCacheQueryManager.java | 97 ++++++++++----------
2 files changed, 64 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f57760d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index dcd379a..be7fa55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1247,14 +1247,27 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
final boolean backup,
final boolean keepBinary,
final AffinityTopologyVersion topVer) {
+
+ return iterator(localEntriesIteratorEx(primary, backup, topVer), !keepBinary);
+ }
+
+ /**
+ * @param primary If {@code true} includes primary entries.
+ * @param backup If {@code true} includes backup entries.
+ * @param topVer Specified affinity topology version.
+ * @return Local entries iterator.
+ */
+ public Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final boolean primary,
+ final boolean backup,
+ final AffinityTopologyVersion topVer) {
assert primary || backup;
if (primary && backup)
- return iterator(entries().iterator(), !keepBinary);
+ return entries().iterator();
else {
final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator();
- Iterator<GridCacheMapEntry> it = new Iterator<GridCacheMapEntry>() {
+ return new Iterator<GridCacheMapEntry>() {
private GridCacheMapEntry next;
private Iterator<GridCacheMapEntry> curIt;
@@ -1311,8 +1324,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
while (partIt.hasNext());
}
};
-
- return iterator(it, !keepBinary);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2f57760d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index d64dff4..14b1106 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1033,23 +1033,25 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @throws GridDhtUnreservedPartitionException If failed to reserve partition.
*/
private GridIterator<IgniteBiTuple<K, V>> onheapIterator(
- GridCacheQueryAdapter<?> qry,
+ final GridCacheQueryAdapter<?> qry,
AffinityTopologyVersion topVer,
final IgniteBiPredicate<K, V> keyValFilter,
- boolean backups,
+ final boolean backups,
final ExpiryPolicy plc,
final boolean locNode) throws GridDhtUnreservedPartitionException {
- Iterator<K> keyIter;
+ Iterator<? extends GridCacheEntryEx> entryIter;
GridDhtLocalPartition locPart = null;
Integer part = qry.partition();
- if (part == null || cctx.isLocal()) {
- // Performance optimization.
- if (locNode && plc == null && !cctx.isLocal()) {
- GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht();
+ if (cctx.isLocal())
+ entryIter = cctx.local().allEntries().iterator();
+ else if (part == null) {
+ GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht();
+ // Performance optimization.
+ if (locNode && plc == null) {
final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true,
backups, cache.context().keepBinary(), topVer);
@@ -1099,12 +1101,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
};
}
- IgniteInternalCache<K, V> keepBinaryCache = cctx.cache().keepBinary();
-
- keyIter = backups ? keepBinaryCache.keySetx().iterator() : keepBinaryCache.primaryKeySet().iterator();
+ entryIter = cache.localEntriesIteratorEx(true, backups, topVer);
}
else if (part < 0 || part >= cctx.affinity().partitions())
- keyIter = new GridEmptyIterator<>();
+ return new GridEmptyIterator<>();
else {
final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
@@ -1115,28 +1115,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
"Partition can not be reserved.");
- final GridDhtLocalPartition locPart0 = locPart;
-
- keyIter = new Iterator<K>() {
- private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator();
-
- @Override public boolean hasNext() {
- return iter0.hasNext();
- }
-
- @Override public K next() {
- return (K)iter0.next();
- }
-
- @Override public void remove() {
- iter0.remove();
- }
- };
+ entryIter = locPart.allEntries().iterator();
}
final GridDhtLocalPartition locPart0 = locPart;
- return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) {
+ return new PeekValueExpiryAwareIterator(entryIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) {
@Override protected void onClose() {
super.onClose();
@@ -1263,18 +1247,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
ExpiryPolicy expPlc,
final boolean keepBinary,
boolean locNode) {
- Iterator<K> keyIter = new Iterator<K>() {
+ Iterator<? extends GridCacheEntryEx> keyIter = new Iterator<GridCacheEntryEx>() {
/** {@inheritDoc} */
@Override public boolean hasNext() {
return it.hasNext();
}
/** {@inheritDoc} */
- @Override public K next() {
+ @Override public GridCacheEntryEx next() {
try {
KeyCacheObject key = cctx.toCacheKeyObject(it.next().getKey());
- return (K)cctx.unwrapBinaryIfNeeded(key, keepBinary);
+ final GridCacheEntryEx entryEx = cctx.cache().entryEx(key);
+
+ return entryEx;
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -2189,8 +2175,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
- * Gets cache queries detailed metrics.
- * Detail metrics could be enabled by setting non-zero value via {@link CacheConfiguration#setQueryDetailMetricsSize(int)}
+ * Gets cache queries detailed metrics. Detail metrics could be enabled by setting non-zero value via {@link
+ * CacheConfiguration#setQueryDetailMetricsSize(int)}
*
* @return Cache queries metrics aggregated by query type and query text.
*/
@@ -3091,8 +3077,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> {
/** Absolute position of each recipient. */
private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1);
+
/** */
private CircularQueue<R> queue;
+
/** */
private int pruned;
@@ -3529,10 +3517,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private IgniteCacheExpiryPolicy expiryPlc;
/** */
- private Iterator<K> keyIt;
+ private Iterator<? extends GridCacheEntryEx> entryIt;
/**
- * @param keyIt Key iterator.
+ * @param entryIter Key iterator.
* @param plc Expiry policy.
* @param topVer Topology version.
* @param keyValFilter Key-value filter.
@@ -3540,8 +3528,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param locNode Local node.
* @param heapOnly Heap only.
*/
- private PeekValueExpiryAwareIterator(
- Iterator<K> keyIt,
+ PeekValueExpiryAwareIterator(
+ Iterator<? extends GridCacheEntryEx> entryIter,
ExpiryPolicy plc,
AffinityTopologyVersion topVer,
IgniteBiPredicate<K, V> keyValFilter,
@@ -3549,7 +3537,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
boolean locNode,
boolean heapOnly
) {
- this.keyIt = keyIt;
+ this.entryIt = entryIter;
this.plc = plc;
this.topVer = topVer;
this.keyValFilter = keyValFilter;
@@ -3593,15 +3581,27 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
private void advance() {
IgniteBiTuple<K, V> next0 = null;
- while (keyIt.hasNext()) {
+ while (entryIt.hasNext()) {
next0 = null;
- K key = keyIt.next();
+ GridCacheEntryEx entry = entryIt.next();
+
+ if (entry.deleted())
+ continue;
+ KeyCacheObject key = entry.key();
CacheObject val;
try {
- val = value(key);
+ if (heapOnly)
+ val = entry.peek(true, false, false, expiryPlc);
+ else
+ val = value(entry, entry.key());
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ assert heapOnly;
+
+ continue;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
@@ -3664,23 +3664,24 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
/**
+ * @param entry Entry.
* @param key Key.
* @return Value.
* @throws IgniteCheckedException If failed to peek value.
*/
- private CacheObject value(K key) throws IgniteCheckedException {
+ private CacheObject value(GridCacheEntryEx entry, KeyCacheObject key) throws IgniteCheckedException {
while (true) {
try {
- GridCacheEntryEx entry = heapOnly ? cache.peekEx(key) : cache.entryEx(key);
+ if (entry == null)
+ entry = cache.entryEx(key);
- if (expiryPlc != null && !heapOnly)
+ if (expiryPlc != null)
entry.unswap();
- return entry != null ? entry.peek(true, !heapOnly, !heapOnly, topVer, expiryPlc) : null;
+ return entry.peek(true, true, true, topVer, expiryPlc);
}
catch (GridCacheEntryRemovedException ignore) {
- if (heapOnly)
- return null;
+ entry = null;
}
}
}
[07/13] ignite git commit: IGNITE-3429 - Rollback due to broken
compilation
Posted by nt...@apache.org.
IGNITE-3429 - Rollback due to broken compilation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c0e2df26
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c0e2df26
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c0e2df26
Branch: refs/heads/master
Commit: c0e2df26f056cd11690d821146f05e3fd938906e
Parents: 2f57760
Author: dkarachentsev <dk...@gridgain.com>
Authored: Mon Feb 20 11:17:35 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Mon Feb 20 11:17:35 2017 +0300
----------------------------------------------------------------------
.../Hibernate5CacheKeyTypeConfiguration.java | 52 --------------------
.../HibernateCacheKeyTypeConfiguration.java | 51 -------------------
2 files changed, 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c0e2df26/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
deleted file mode 100644
index 886f69b..0000000
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/Hibernate5CacheKeyTypeConfiguration.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.hibernate.config;
-
-import java.util.Objects;
-import org.apache.ignite.binary.BinaryAbstractIdentityResolver;
-import org.apache.ignite.binary.BinaryIdentityResolver;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.binary.BinaryTypeConfiguration;
-
-/**
- * This configuration provides correct {@link BinaryIdentityResolver} implementation
- * for Hibernate CacheKey class can be used as a key object.
- *
- * Note: for Hibernate version < 5.0 {@link HibernateCacheKeyTypeConfiguration} should be used.
-
- */
-public class Hibernate5CacheKeyTypeConfiguration extends BinaryTypeConfiguration {
-
- /** {@inheritDoc} */
- public Hibernate5CacheKeyTypeConfiguration() {
- super("org.hibernate.cache.internal.CacheKeyImplementation");
-
- setIdentityResolver(new BinaryAbstractIdentityResolver() {
- @Override protected int hashCode0(BinaryObject obj) {
- return obj.field("id").hashCode();
- }
-
- @Override protected boolean equals0(BinaryObject o1, BinaryObject o2) {
- Object obj0 = o1.field("id");
- Object obj1 = o2.field("id");
-
- return Objects.equals(obj0, obj1);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c0e2df26/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
deleted file mode 100644
index c54292e..0000000
--- a/modules/hibernate/src/main/java/org/apache/ignite/cache/hibernate/config/HibernateCacheKeyTypeConfiguration.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.hibernate.config;
-
-import java.util.Objects;
-import org.apache.ignite.binary.BinaryAbstractIdentityResolver;
-import org.apache.ignite.binary.BinaryIdentityResolver;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.binary.BinaryTypeConfiguration;
-
-/**
- * This configuration provides correct {@link BinaryIdentityResolver} implementation
- * for Hibernate CacheKey class can be used as a key object.
- *
- * Note: for Hibernate version >= 5.0 {@link Hibernate5CacheKeyTypeConfiguration} should be used.
- */
-public class HibernateCacheKeyTypeConfiguration extends BinaryTypeConfiguration {
-
- /** {@inheritDoc} */
- public HibernateCacheKeyTypeConfiguration() {
- super("org.hibernate.cache.spi.CacheKey");
-
- setIdentityResolver(new BinaryAbstractIdentityResolver() {
- @Override protected int hashCode0(BinaryObject obj) {
- return obj.field("key").hashCode();
- }
-
- @Override protected boolean equals0(BinaryObject o1, BinaryObject o2) {
- Object obj0 = o1.field("key");
- Object obj1 = o2.field("key");
-
- return Objects.equals(obj0, obj1);
- }
- });
- }
-}
[13/13] ignite git commit: Merge ignite-1.7.9 into master
Posted by nt...@apache.org.
Merge ignite-1.7.9 into master
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84880a81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84880a81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84880a81
Branch: refs/heads/master
Commit: 84880a8108a586604ab00d7bf48ba6c9f8f658ee
Parents: 94c1e7c bcb1398
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Mar 16 16:34:28 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Mar 16 16:34:36 2017 +0300
----------------------------------------------------------------------
.../internal/managers/GridManagerAdapter.java | 4 +
.../cache/DynamicCacheChangeBatch.java | 14 ++
.../service/GridServiceProcessor.java | 49 ++---
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 5 +
.../org/apache/ignite/spi/IgniteSpiContext.java | 6 +
.../ignite/spi/discovery/tcp/ClientImpl.java | 32 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 19 ++
.../spi/discovery/tcp/TcpDiscoverySpi.java | 5 +
.../tcp/internal/TcpDiscoveryNode.java | 6 +-
.../GridServiceContinuousQueryRedeploy.java | 167 +++++++++++++++++
...veryNodeAttributesUpdateOnReconnectTest.java | 110 +++++++++++
.../TcpDiscoverySslSecuredUnsecuredTest.java | 185 +++++++++++++++++++
.../tcp/TestReconnectPluginProvider.java | 111 +++++++++++
.../discovery/tcp/TestReconnectProcessor.java | 93 ++++++++++
.../testframework/GridSpiTestContext.java | 5 +
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
.../IgniteSpiDiscoverySelfTestSuite.java | 5 +
.../org.apache.ignite.plugin.PluginProvider | 1 +
.../processors/query/h2/IgniteH2Indexing.java | 1 +
parent/pom.xml | 1 +
20 files changed, 793 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 99146aa,4eeafed..74cca8e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -1586,16 -1576,19 +1576,22 @@@ public class GridServiceProcessor exten
if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
return;
}
+ else if (msg instanceof DynamicCacheChangeBatch) {
+ if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
+ return;
+ }
+ else
+ return;
}
else
- topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
+ topVer = new AffinityTopologyVersion((evt).topologyVersion(), 0);
- depExe.submit(new BusyRunnable() {
+ depExe.execute(new BusyRunnable() {
@Override public void run0() {
- ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
+ // In case the cache instance isn't tracked by DiscoveryManager anymore.
+ discoCache.updateAlives(ctx.discovery());
+
+ ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 41035ec,5977702..df27868
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@@ -141,7 -144,7 +142,8 @@@ public class IgniteKernalSelfTestSuite
suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class);
suite.addTestSuite(IgniteServiceReassignmentTest.class);
suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class);
+ suite.addTestSuite(IgniteServiceDynamicCachesSelfTest.class);
+ suite.addTestSuite(GridServiceContinuousQueryRedeploy.class);
suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 46fbb9e,62b47b8..7b188bf
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -83,9 -80,7 +83,10 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
http://git-wip-us.apache.org/repos/asf/ignite/blob/84880a81/parent/pom.xml
----------------------------------------------------------------------
[09/13] ignite git commit: Implemented support for enforce join order
flag. (cherry picked from commit a7f77d4)
Posted by nt...@apache.org.
Implemented support for enforce join order flag.
(cherry picked from commit a7f77d4)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57362479
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57362479
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57362479
Branch: refs/heads/master
Commit: 573624796b171b2420b87657598198f40a91f6bb
Parents: 9fcb3e7
Author: Alexey Kuznetsov <ak...@gridgain.com>
Authored: Wed Mar 1 22:09:40 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Thu Mar 2 10:07:25 2017 +0700
----------------------------------------------------------------------
.../internal/visor/query/VisorQueryArgV3.java | 51 ++++++++++++++++++++
.../internal/visor/query/VisorQueryJob.java | 6 +--
.../resources/META-INF/classnames.properties | 1 +
3 files changed, 55 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/57362479/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
new file mode 100644
index 0000000..f32c00a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArgV3.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+/**
+ * Arguments for {@link VisorQueryTask}.
+ */
+public class VisorQueryArgV3 extends VisorQueryArgV2 {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Enforce join order flag. */
+ private final boolean enforceJoinOrder;
+
+ /**
+ * @param cacheName Cache name for query.
+ * @param qryTxt Query text.
+ * @param distributedJoins If {@code true} then distributed joins enabled.
+ * @param enforceJoinOrder If {@code true} then enforce join order.
+ * @param loc Flag whether to execute query locally.
+ * @param pageSize Result batch size.
+ */
+ public VisorQueryArgV3(String cacheName, String qryTxt,
+ boolean distributedJoins, boolean enforceJoinOrder, boolean loc, int pageSize) {
+ super(cacheName, qryTxt, distributedJoins, loc, pageSize);
+
+ this.enforceJoinOrder = enforceJoinOrder;
+ }
+
+ /**
+ * @return Enforce join order flag.
+ */
+ public boolean enforceJoinOrder() {
+ return enforceJoinOrder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57362479/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index c66b2dd..1ac90ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -131,9 +131,8 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
if (scanWithFilter) {
boolean caseSensitive = qryTxt.startsWith(SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE);
- String ptrn = caseSensitive
- ? qryTxt.substring(SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length())
- : qryTxt.substring(SCAN_CACHE_WITH_FILTER.length());
+ String ptrn = qryTxt.substring(
+ caseSensitive ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE.length() : SCAN_CACHE_WITH_FILTER.length());
filter = new VisorQueryScanSubstringFilter(caseSensitive, ptrn);
}
@@ -162,6 +161,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
qry.setPageSize(arg.pageSize());
qry.setLocal(arg.local());
qry.setDistributedJoins(arg instanceof VisorQueryArgV2 && ((VisorQueryArgV2)arg).distributedJoins());
+ qry.setEnforceJoinOrder(arg instanceof VisorQueryArgV3 && ((VisorQueryArgV3)arg).enforceJoinOrder());
long start = U.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/ignite/blob/57362479/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 8a6dc66..db486a5 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1792,6 +1792,7 @@ org.apache.ignite.internal.visor.node.VisorSpisConfiguration
org.apache.ignite.internal.visor.node.VisorTransactionConfiguration
org.apache.ignite.internal.visor.query.VisorQueryArg
org.apache.ignite.internal.visor.query.VisorQueryArgV2
+org.apache.ignite.internal.visor.query.VisorQueryArgV3
org.apache.ignite.internal.visor.query.VisorQueryCleanupTask
org.apache.ignite.internal.visor.query.VisorQueryCleanupTask$VisorQueryCleanupJob
org.apache.ignite.internal.visor.query.VisorQueryField
[11/13] ignite git commit: ignite-4577 Add non-reachable addresses at
the end of addresses list
Posted by nt...@apache.org.
ignite-4577 Add non-reachable addresses at the end of addresses list
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ad8e79f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ad8e79f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ad8e79f
Branch: refs/heads/master
Commit: 7ad8e79fa1077291c50f2f535ecccde6baee0321
Parents: a30183a
Author: Evgenii Zhuravlev <ez...@gridgain.com>
Authored: Tue Mar 7 14:32:28 2017 +0300
Committer: Evgenii Zhuravlev <ez...@gridgain.com>
Committed: Tue Mar 7 14:41:38 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 14 ++++++-----
.../communication/tcp/TcpCommunicationSpi.java | 25 ++++++++++++++++++++
2 files changed, 33 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ad8e79f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3fa3f7b..ba118cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1810,15 +1810,16 @@ public abstract class IgniteUtils {
/**
* @param addrs Addresses.
+ * @return List of reachable addresses.
*/
- public static List<InetAddress> filterReachable(List<InetAddress> addrs) {
+ public static List<InetAddress> filterReachable(Collection<InetAddress> addrs) {
final int reachTimeout = 2000;
if (addrs.isEmpty())
return Collections.emptyList();
if (addrs.size() == 1) {
- InetAddress addr = addrs.get(0);
+ InetAddress addr = F.first(addrs);
if (reachable(addr, reachTimeout))
return Collections.singletonList(addr);
@@ -1834,8 +1835,7 @@ public abstract class IgniteUtils {
for (final InetAddress addr : addrs) {
futs.add(executor.submit(new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
if (reachable(addr, reachTimeout)) {
synchronized (res) {
res.add(addr);
@@ -1848,11 +1848,13 @@ public abstract class IgniteUtils {
for (Future<?> fut : futs) {
try {
fut.get();
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteException("Thread has been interrupted.", e);
- } catch (ExecutionException e) {
+ }
+ catch (ExecutionException e) {
throw new IgniteException(e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ad8e79f/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 94b7efe..81454f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2334,6 +2334,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (isExtAddrsExist)
addrs.addAll(extAddrs);
+ Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size());
+
+ for (InetSocketAddress addr : addrs)
+ allInetAddrs.add(addr.getAddress());
+
+ List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs);
+
+ if (reachableInetAddrs.size() < allInetAddrs.size()) {
+ LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size());
+
+ for (InetSocketAddress addr : addrs) {
+ if (reachableInetAddrs.contains(addr.getAddress()))
+ addrs0.add(addr);
+ }
+ for (InetSocketAddress addr : addrs) {
+ if (!reachableInetAddrs.contains(addr.getAddress()))
+ addrs0.add(addr);
+ }
+
+ addrs = addrs0;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']');
+
boolean conn = false;
GridCommunicationClient client = null;
IgniteCheckedException errs = null;
[05/13] ignite git commit: IGNITE-4436 API for collecting list of
running queries and cancel them. (cherry picked from commit
49237343d53ee33d44e5599cd7fe7da868ee30a1)
Posted by nt...@apache.org.
IGNITE-4436 API for collecting list of running queries and cancel them.
(cherry picked from commit 49237343d53ee33d44e5599cd7fe7da868ee30a1)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11bbec48
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11bbec48
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11bbec48
Branch: refs/heads/master
Commit: 11bbec487dc174fac1acd6b50c940840305bc75a
Parents: 382fbc9
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Feb 17 17:57:50 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Feb 17 17:57:50 2017 +0700
----------------------------------------------------------------------
.../cache/query/GridCacheTwoStepQuery.java | 18 +-
.../processors/query/GridQueryIndexing.java | 17 +-
.../processors/query/GridQueryProcessor.java | 26 ++-
.../processors/query/GridRunningQueryInfo.java | 132 ++++++++++++
.../internal/visor/VisorMultiNodeTask.java | 2 +-
.../visor/query/VisorCancelQueriesTask.java | 72 +++++++
.../query/VisorCollectRunningQueriesTask.java | 96 +++++++++
.../internal/visor/query/VisorRunningQuery.java | 132 ++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 83 +++++++-
.../query/h2/sql/GridSqlQuerySplitter.java | 4 +-
.../h2/twostep/GridReduceQueryExecutor.java | 60 +++++-
.../cache/CacheSqlQueryValueCopySelfTest.java | 208 +++++++++++++++++--
.../cache/GridCacheCrossCacheQuerySelfTest.java | 2 +-
.../h2/GridIndexingSpiAbstractSelfTest.java | 7 +
14 files changed, 821 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 8dcba2f..f53936f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -46,6 +46,9 @@ public class GridCacheTwoStepQuery {
private boolean explain;
/** */
+ private String originalSql;
+
+ /** */
private Collection<String> spaces;
/** */
@@ -67,10 +70,12 @@ public class GridCacheTwoStepQuery {
private List<Integer> extraCaches;
/**
+ * @param originalSql Original query SQL.
* @param schemas Schema names in query.
* @param tbls Tables in query.
*/
- public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) {
+ public GridCacheTwoStepQuery(String originalSql, Set<String> schemas, Set<String> tbls) {
+ this.originalSql = originalSql;
this.schemas = schemas;
this.tbls = tbls;
}
@@ -196,6 +201,13 @@ public class GridCacheTwoStepQuery {
}
/**
+ * @return Original query SQL.
+ */
+ public String originalSql() {
+ return originalSql;
+ }
+
+ /**
* @return Spaces.
*/
public Collection<String> spaces() {
@@ -223,7 +235,7 @@ public class GridCacheTwoStepQuery {
public GridCacheTwoStepQuery copy(Object[] args) {
assert !explain;
- GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls);
+ GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);
cp.caches = caches;
cp.extraCaches = extraCaches;
@@ -250,4 +262,4 @@ public class GridCacheTwoStepQuery {
@Override public String toString() {
return S.toString(GridCacheTwoStepQuery.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 539ebc0..1cebbb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -224,7 +224,22 @@ public interface GridQueryIndexing {
public void onDisconnected(IgniteFuture<?> reconnectFut);
/**
+ * Collect queries that already running more than specified duration.
+ *
+ * @param duration Duration to check.
+ * @return Collection of long running queries.
+ */
+ public Collection<GridRunningQueryInfo> runningQueries(long duration);
+
+ /**
+ * Cancel specified queries.
+ *
+ * @param queries Queries ID's to cancel.
+ */
+ public void cancelQueries(Collection<Long> queries);
+
+ /**
* Cancels all executing queries.
*/
public void cancelAllQueries();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index c2e5717..0a0d166 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -44,7 +44,6 @@ import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryType;
@@ -118,7 +117,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
private static final int QRY_DETAIL_METRICS_EVICTION_FREQ = 3_000;
/** */
- private static Set<Class<?>> SQL_TYPES = new HashSet<>(F.<Class<?>>asList(
+ private static final Set<Class<?>> SQL_TYPES = new HashSet<>(F.<Class<?>>asList(
Integer.class,
Boolean.class,
Byte.class,
@@ -913,6 +912,29 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Collect queries that already running more than specified duration.
+ *
+ * @param duration Duration to check.
+ * @return Collection of long running queries.
+ */
+ public Collection<GridRunningQueryInfo> runningQueries(long duration) {
+ if (moduleEnabled())
+ return idx.runningQueries(duration);
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Cancel specified queries.
+ *
+ * @param queries Queries ID's to cancel.
+ */
+ public void cancelQueries(Collection<Long> queries) {
+ if (moduleEnabled())
+ idx.cancelQueries(queries);
+ }
+
+ /**
* @param sqlQry Sql query.
* @param params Params.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
new file mode 100644
index 0000000..d77c8c0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+
+/**
+ * Query descriptor.
+ */
+public class GridRunningQueryInfo {
+ /** */
+ private final long id;
+
+ /** */
+ private final String qry;
+
+ /** Query type. */
+ private final GridCacheQueryType qryType;
+
+ /** */
+ private final String cache;
+
+ /** */
+ private final long startTime;
+
+ /** */
+ private final GridQueryCancel cancel;
+
+ /** */
+ private final boolean loc;
+
+ /**
+ * @param id Query ID.
+ * @param qry Query text.
+ * @param qryType Query type.
+ * @param cache Cache where query was executed.
+ * @param startTime Query start time.
+ * @param cancel Query cancel.
+ * @param loc Local query flag.
+ */
+ public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String cache, long startTime,
+ GridQueryCancel cancel, boolean loc) {
+ this.id = id;
+ this.qry = qry;
+ this.qryType = qryType;
+ this.cache = cache;
+ this.startTime = startTime;
+ this.cancel = cancel;
+ this.loc = loc;
+ }
+
+ /**
+ * @return Query ID.
+ */
+ public Long id() {
+ return id;
+ }
+
+ /**
+ * @return Query text.
+ */
+ public String query() {
+ return qry;
+ }
+
+ /**
+ * @return Query type.
+ */
+ public GridCacheQueryType queryType() {
+ return qryType;
+ }
+
+ /**
+ * @return Cache where query was executed.
+ */
+ public String cache() {
+ return cache;
+ }
+
+ /**
+ * @return Query start time.
+ */
+ public long startTime() {
+ return startTime;
+ }
+
+ /**
+ * @param curTime Current time.
+ * @param duration Duration of long query.
+ * @return {@code true} if this query should be considered as long running query.
+ */
+ public boolean longQuery(long curTime, long duration) {
+ return curTime - startTime > duration;
+ }
+
+ /**
+ * Cancel query.
+ */
+ public void cancel() {
+ if (cancel != null)
+ cancel.cancel();
+ }
+
+ /**
+ * @return {@code true} if query can be cancelled.
+ */
+ public boolean cancelable() {
+ return cancel != null;
+ }
+
+ /**
+ * @return {@code true} if query is local.
+ */
+ public boolean local() {
+ return loc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
index 57f1346..ece1a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
@@ -130,4 +130,4 @@ public abstract class VisorMultiNodeTask<A, R, J> implements ComputeTask<VisorTa
logFinish(ignite.log(), getClass(), start);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
new file mode 100644
index 0000000..a6f2d82
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorOneNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task to cancel queries.
+ */
+@GridInternal
+public class VisorCancelQueriesTask extends VisorOneNodeTask<Collection<Long>, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorCancelQueriesJob job(Collection<Long> arg) {
+ return new VisorCancelQueriesJob(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override protected Void reduce0(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+
+ /**
+ * Job to cancel queries on node.
+ */
+ private static class VisorCancelQueriesJob extends VisorJob<Collection<Long>, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create job with specified argument.
+ *
+ * @param arg Job argument.
+ * @param debug Flag indicating whether debug information should be printed into node log.
+ */
+ protected VisorCancelQueriesJob(@Nullable Collection<Long> arg, boolean debug) {
+ super(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Void run(@Nullable Collection<Long> queries) throws IgniteException {
+ ignite.context().query().cancelQueries(queries);
+
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java
new file mode 100644
index 0000000..2b40e61
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task to collect currently running queries.
+ */
+@GridInternal
+public class VisorCollectRunningQueriesTask extends VisorMultiNodeTask<Long, Map<UUID, Collection<VisorRunningQuery>>, Collection<VisorRunningQuery>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorCollectRunningQueriesJob job(Long arg) {
+ return new VisorCollectRunningQueriesJob(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override protected Map<UUID, Collection<VisorRunningQuery>> reduce0(List<ComputeJobResult> results) throws IgniteException {
+ Map<UUID, Collection<VisorRunningQuery>> map = new HashMap<>();
+
+ for (ComputeJobResult res : results)
+ if (res.getException() == null) {
+ Collection<VisorRunningQuery> queries = res.getData();
+
+ map.put(res.getNode().id(), queries);
+ }
+
+ return map;
+ }
+
+ /**
+ * Job to collect currently running queries from node.
+ */
+ private static class VisorCollectRunningQueriesJob extends VisorJob<Long, Collection<VisorRunningQuery>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create job with specified argument.
+ *
+ * @param arg Job argument.
+ * @param debug Flag indicating whether debug information should be printed into node log.
+ */
+ protected VisorCollectRunningQueriesJob(@Nullable Long arg, boolean debug) {
+ super(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<VisorRunningQuery> run(@Nullable Long duration) throws IgniteException {
+ Collection<GridRunningQueryInfo> queries = ignite.context().query()
+ .runningQueries(duration != null ? duration : 0);
+
+ Collection<VisorRunningQuery> res = new ArrayList<>(queries.size());
+
+ long curTime = U.currentTimeMillis();
+
+ for (GridRunningQueryInfo qry : queries)
+ res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(),
+ qry.startTime(), curTime - qry.startTime(),
+ qry.cancelable(), qry.local()));
+
+ return res;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
new file mode 100644
index 0000000..fc6bc7a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+
+/**
+ * Descriptor of running query.
+ */
+public class VisorRunningQuery implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long id;
+
+ /** Query text. */
+ private String qry;
+
+ /** Query type. */
+ private GridCacheQueryType qryType;
+
+ /** Cache name for query. */
+ private String cache;
+
+ /** */
+ private long startTime;
+
+ /** */
+ private long duration;
+
+ /** */
+ private boolean cancellable;
+
+ /** */
+ private boolean loc;
+
+ /**
+ * @param id Query ID.
+ * @param qry Query text.
+ * @param qryType Query type.
+ * @param cache Cache where query was executed.
+ * @param startTime Query start time.
+ * @param duration Query current duration.
+ * @param cancellable {@code true} if query can be canceled.
+ * @param loc {@code true} if query is local.
+ */
+ public VisorRunningQuery(long id, String qry, GridCacheQueryType qryType, String cache,
+ long startTime, long duration,
+ boolean cancellable, boolean loc) {
+ this.id = id;
+ this.qry = qry;
+ this.qryType = qryType;
+ this.cache = cache;
+ this.startTime = startTime;
+ this.duration = duration;
+ this.cancellable = cancellable;
+ this.loc = loc;
+ }
+
+ /**
+ * @return Query ID.
+ */
+ public long getId() {
+ return id;
+ }
+
+ /**
+ * @return Query txt.
+ */
+ public String getQuery() {
+ return qry;
+ }
+
+ /**
+ * @return Query type.
+ */
+ public GridCacheQueryType getQueryType() {
+ return qryType;
+ }
+
+ /**
+ * @return Cache name.
+ */
+ public String getCache() {
+ return cache;
+ }
+
+ /**
+ * @return Query start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * @return Query duration.
+ */
+ public long getDuration() {
+ return duration;
+ }
+
+ /**
+ * @return {@code true} if query can be cancelled.
+ */
+ public boolean isCancelable() {
+ return cancellable;
+ }
+
+ /**
+ * @return {@code true} if query is local.
+ */
+ public boolean isLocal() {
+ return loc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index cbf2ebd..62b47b8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -52,6 +52,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
@@ -79,6 +80,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -172,6 +174,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getString;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
@@ -279,9 +284,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private final Map<String, String> space2schema = new ConcurrentHashMap8<>();
/** */
+ private AtomicLong qryIdGen;
+
+ /** */
private GridSpinBusyLock busyLock;
/** */
+ private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap8<>();
+
+ /** */
private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
@Nullable @Override public ConnectionWrapper get() {
ConnectionWrapper c = super.get();
@@ -751,8 +762,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IndexingQueryFilter filters) throws IgniteCheckedException {
TableDescriptor tbl = tableDescriptor(spaceName, type);
- if (tbl != null && tbl.luceneIdx != null)
- return tbl.luceneIdx.query(qry, filters);
+ if (tbl != null && tbl.luceneIdx != null) {
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, spaceName,
+ U.currentTimeMillis(), null, true);
+
+ try {
+ runs.put(run.id(), run);
+
+ return tbl.luceneIdx.query(qry, filters);
+ }
+ finally {
+ runs.remove(run.id());
+ }
+ }
return new GridEmptyCloseableIterator<>();
}
@@ -796,6 +818,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.set(ctx);
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS,
+ spaceName, U.currentTimeMillis(), cancel, true);
+
+ runs.putIfAbsent(run.id(), run);
+
try {
ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
@@ -803,6 +830,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
finally {
GridH2QueryContext.clearThreadLocal();
+
+ runs.remove(run.id());
}
}
};
@@ -1061,6 +1090,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName,
+ U.currentTimeMillis(), null, true);
+
+ runs.put(run.id(), run);
+
try {
ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
@@ -1068,6 +1102,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
finally {
GridH2QueryContext.clearThreadLocal();
+
+ runs.remove(run.id());
}
}
@@ -1692,6 +1728,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
this.busyLock = busyLock;
+ qryIdGen = new AtomicLong();
+
if (SysProperties.serializeJavaObject) {
U.warn(log, "Serialization of Java objects in H2 was enabled.");
@@ -1742,7 +1780,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
marshaller = ctx.config().getMarshaller();
mapQryExec = new GridMapQueryExecutor(busyLock);
- rdcQryExec = new GridReduceQueryExecutor(busyLock);
+ rdcQryExec = new GridReduceQueryExecutor(qryIdGen, busyLock);
mapQryExec.start(ctx, this);
rdcQryExec.start(ctx, this);
@@ -2196,6 +2234,37 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return cols;
}
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
+ Collection<GridRunningQueryInfo> res = new ArrayList<>();
+
+ res.addAll(runs.values());
+ res.addAll(rdcQryExec.longRunningQueries(duration));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancelQueries(Collection<Long> queries) {
+ if (!F.isEmpty(queries)) {
+ for (Long qryId : queries) {
+ GridRunningQueryInfo run = runs.get(qryId);
+
+ if (run != null)
+ run.cancel();
+ }
+
+ rdcQryExec.cancelQueries(queries);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancelAllQueries() {
+ for (Connection conn : conns)
+ U.close(conn, log);
+ }
+
/**
* Wrapper to store connection and flag is schema set or not.
*/
@@ -3086,10 +3155,4 @@ public class IgniteH2Indexing implements GridQueryIndexing {
lastUsage = U.currentTimeMillis();
}
}
-
- /** {@inheritDoc} */
- @Override public void cancelAllQueries() {
- for (Connection conn : conns)
- U.close(conn, log);
- }
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 7d43bf6..8284c45 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -170,7 +170,7 @@ public class GridSqlQuerySplitter {
qry = collectAllTables(qry, schemas, tbls);
// Build resulting two step query.
- GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(schemas, tbls);
+ GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(qry.getSQL(), schemas, tbls);
// Map query will be direct reference to the original query AST.
// Thus all the modifications will be performed on the original AST, so we should be careful when
@@ -954,4 +954,4 @@ public class GridSqlQuerySplitter {
private static GridSqlFunction function(GridSqlFunctionType type) {
return new GridSqlFunction(type);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 40b11973..3f886ee 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
@@ -99,6 +100,7 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
/**
@@ -121,7 +123,7 @@ public class GridReduceQueryExecutor {
private IgniteLogger log;
/** */
- private final AtomicLong reqIdGen = new AtomicLong();
+ private final AtomicLong qryIdGen;
/** */
private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
@@ -168,9 +170,11 @@ public class GridReduceQueryExecutor {
};
/**
+ * @param qryIdGen Query ID generator.
* @param busyLock Busy lock.
*/
- public GridReduceQueryExecutor(GridSpinBusyLock busyLock) {
+ public GridReduceQueryExecutor(AtomicLong qryIdGen, GridSpinBusyLock busyLock) {
+ this.qryIdGen = qryIdGen;
this.busyLock = busyLock;
}
@@ -494,11 +498,13 @@ public class GridReduceQueryExecutor {
}
}
- final long qryReqId = reqIdGen.incrementAndGet();
+ final long qryReqId = qryIdGen.incrementAndGet();
final String space = cctx.name();
- final QueryRun r = new QueryRun(h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize());
+ final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), space,
+ h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize(),
+ U.currentTimeMillis(), cancel);
AffinityTopologyVersion topVer = h2.readyTopologyVersion();
@@ -1304,10 +1310,46 @@ public class GridReduceQueryExecutor {
}
/**
+ * Collect queries that already running more than specified duration.
+ *
+ * @param duration Duration to check.
+ * @return Collection of IDs and statements of long running queries.
+ */
+ public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
+ Collection<GridRunningQueryInfo> res = new ArrayList<>();
+
+ long curTime = U.currentTimeMillis();
+
+ for (QueryRun run : runs.values()) {
+ if (run.qry.longQuery(curTime, duration))
+ res.add(run.qry);
+ }
+
+ return res;
+ }
+
+ /**
+ * Cancel specified queries.
+ *
+ * @param queries Queries IDs to cancel.
+ */
+ public void cancelQueries(Collection<Long> queries) {
+ for (Long qryId : queries) {
+ QueryRun run = runs.get(qryId);
+
+ if (run != null)
+ run.qry.cancel();
+ }
+ }
+
+ /**
* Query run.
*/
private static class QueryRun {
/** */
+ private final GridRunningQueryInfo qry;
+
+ /** */
private final List<GridMergeIndex> idxs;
/** */
@@ -1323,11 +1365,17 @@ public class GridReduceQueryExecutor {
private final AtomicReference<Object> state = new AtomicReference<>();
/**
+ * @param id Query ID.
+ * @param qry Query text.
+ * @param cache Cache where query was executed.
* @param conn Connection.
* @param idxsCnt Number of indexes.
* @param pageSize Page size.
+ * @param startTime Start time.
+ * @param cancel Query cancel handler.
*/
- private QueryRun(Connection conn, int idxsCnt, int pageSize) {
+ private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
+ this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false);
this.conn = (JdbcConnection)conn;
this.idxs = new ArrayList<>(idxsCnt);
this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
@@ -1410,4 +1458,4 @@ public class GridReduceQueryExecutor {
return copy(msg, n, partsMap);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
index e47e893..66e7e4a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
@@ -17,15 +17,23 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -54,6 +62,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
cc.setCopyOnRead(true);
cc.setIndexedTypes(Integer.class, Value.class);
+ cc.setSqlFunctionClasses(TestSQLFunctions.class);
cfg.setCacheConfiguration(cc);
@@ -72,7 +81,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
IgniteCache<Integer, Value> cache = grid(0).cache(null);
for (int i = 0; i < KEYS; i++)
- cache.put(i, new Value("before"));
+ cache.put(i, new Value(i, "before-" + i));
}
/** {@inheritDoc} */
@@ -195,17 +204,148 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
check(cache);
}
- /** */
- private static class Value {
- /** */
- private String str;
+ /**
+ * Run specified query in separate thread.
+ *
+ * @param qry Query to execute.
+ */
+ private IgniteInternalFuture<?> runQueryAsync(final Query<?> qry) throws Exception {
+ return multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ log.info(">>> Query started");
+
+ grid(0).cache(null).query(qry).getAll();
+
+ log.info(">>> Query finished");
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }, 1, "run-query");
+ }
- /**
- * @param str String.
- */
- public Value(String str) {
- this.str = str;
+ /**
+ * Test collecting info about running.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRunningSqlFieldsQuery() throws Exception {
+ IgniteInternalFuture<?> fut = runQueryAsync(new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3"));
+
+ Thread.sleep(500);
+
+ GridQueryProcessor qryProc = grid(0).context().query();
+
+ Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+ assertEquals(1, queries.size());
+
+ fut.get();
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(0, queries.size());
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3");
+ qry.setLocal(true);
+
+ fut = runQueryAsync(qry);
+
+ Thread.sleep(500);
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(1, queries.size());
+
+ fut.get();
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(0, queries.size());
+ }
+
+ /**
+ * Test collecting info about running.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRunningSqlQuery() throws Exception {
+ IgniteInternalFuture<?> fut = runQueryAsync(new SqlQuery<Integer, Value>(Value.class, "id > sleep(100)"));
+
+ Thread.sleep(500);
+
+ GridQueryProcessor qryProc = grid(0).context().query();
+
+ Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+ assertEquals(1, queries.size());
+
+ fut.get();
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(0, queries.size());
+
+ SqlQuery<Integer, Value> qry = new SqlQuery<>(Value.class, "id > sleep(100)");
+ qry.setLocal(true);
+
+ fut = runQueryAsync(qry);
+
+ Thread.sleep(500);
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(1, queries.size());
+
+ fut.get();
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(0, queries.size());
+ }
+
+ /**
+ * Test collecting info about running.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCancelingSqlFieldsQuery() throws Exception {
+ runQueryAsync(new SqlFieldsQuery("select * from (select _val, sleep(100) from Value limit 50)"));
+
+ Thread.sleep(500);
+
+ final GridQueryProcessor qryProc = grid(0).context().query();
+
+ Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+ assertEquals(1, queries.size());
+
+ final Collection<GridRunningQueryInfo> finalQueries = queries;
+
+ for (GridRunningQueryInfo query : finalQueries)
+ qryProc.cancelQueries(Collections.singleton(query.id()));
+
+ int n = 100;
+
+ // Give cluster some time to cancel query and cleanup resources.
+ while (n > 0) {
+ Thread.sleep(100);
+
+ queries = qryProc.runningQueries(0);
+
+ if (queries.isEmpty())
+ break;
+
+ log.info(">>>> Wait for cancel: " + n);
+
+ n--;
}
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(0, queries.size());
}
/**
@@ -218,9 +358,53 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
for (Cache.Entry<Integer, Value> entry : cache) {
cnt++;
- assertEquals("before", entry.getValue().str);
+ assertEquals("before-" + entry.getKey(), entry.getValue().str);
}
assertEquals(KEYS, cnt);
}
-}
\ No newline at end of file
+
+ /** */
+ private static class Value {
+ /** */
+ @QuerySqlField
+ private int id;
+
+ /** */
+ @QuerySqlField
+ private String str;
+
+ /**
+ * @param id ID.
+ * @param str String.
+ */
+ public Value(int id, String str) {
+ this.id = id;
+ this.str = str;
+ }
+ }
+
+ /**
+ * Utility class with custom SQL functions.
+ */
+ public static class TestSQLFunctions {
+ /**
+ * Sleep function to simulate long running queries.
+ *
+ * @param x Time to sleep.
+ * @return Return specified argument.
+ */
+ @QuerySqlFunction
+ public static long sleep(long x) {
+ if (x >= 0)
+ try {
+ Thread.sleep(x);
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+
+ return x;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 1f10593..01fefa3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -477,4 +477,4 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
return storeId;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11bbec48/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index ad8a7e3..814d0e0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -106,6 +106,9 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
spi.registerCache(null, cacheCfg("B"));
}
+ /**
+ * @param name Name.
+ */
private CacheConfiguration cacheCfg(String name) {
CacheConfiguration<?,?> cfg = new CacheConfiguration<>();
@@ -114,6 +117,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
return cfg;
}
+ /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
idx.stop();
@@ -182,6 +186,9 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
return idx;
}
+ /**
+ * @return {@code true} if OFF-HEAP mode should be tested.
+ */
protected boolean offheap() {
return false;
}
[08/13] ignite git commit: IGNITE-4740 - Fix. Service could be
deployed/undeployed twice on concurrent cancel and discovery event.
Posted by nt...@apache.org.
IGNITE-4740 - Fix. Service could be deployed/undeployed twice on concurrent cancel and discovery event.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fcb3e74
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fcb3e74
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fcb3e74
Branch: refs/heads/master
Commit: 9fcb3e74f91c8497b7b1358cdff40950cdf5c568
Parents: c0e2df2
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Feb 28 16:05:06 2017 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Feb 28 16:05:06 2017 +0300
----------------------------------------------------------------------
.../cache/DynamicCacheChangeBatch.java | 14 ++
.../service/GridServiceProcessor.java | 49 +++---
.../GridServiceContinuousQueryRedeploy.java | 167 +++++++++++++++++++
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
4 files changed, 208 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 4dcff9b..a250063 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -113,6 +113,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
return clientReconnect;
}
+ /**
+ * @return {@code True} if request should trigger partition exchange.
+ */
+ public boolean exchangeNeeded() {
+ if (reqs != null) {
+ for (DynamicCacheChangeRequest req : reqs) {
+ if (req.exchangeNeeded())
+ return true;
+ }
+ }
+
+ return false;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeBatch.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 3690f35..4eeafed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -65,10 +65,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.continuous.AbstractContinuousMessage;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridEmptyIterator;
@@ -1468,19 +1470,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
else {
String name = e.getKey().name();
- svcName.set(name);
-
- Collection<ServiceContextImpl> ctxs;
-
- synchronized (locSvcs) {
- ctxs = locSvcs.remove(name);
- }
-
- if (ctxs != null) {
- synchronized (ctxs) {
- cancel(ctxs, ctxs.size());
- }
- }
+ undeploy(name);
// Finish deployment futures if undeployment happened.
GridFutureAdapter<?> fut = depFuts.remove(name);
@@ -1586,6 +1576,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
return;
}
+ else if (msg instanceof DynamicCacheChangeBatch) {
+ if (!((DynamicCacheChangeBatch)msg).exchangeNeeded())
+ return;
+ }
+ else
+ return;
}
else
topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
@@ -1771,21 +1767,26 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
// Handle undeployment.
- else {
- String name = e.getKey().name();
+ else
+ undeploy(e.getKey().name());
+ }
- svcName.set(name);
- Collection<ServiceContextImpl> ctxs;
+ /**
+ * @param name Name.
+ */
+ private void undeploy(String name) {
+ svcName.set(name);
- synchronized (locSvcs) {
- ctxs = locSvcs.remove(name);
- }
+ Collection<ServiceContextImpl> ctxs;
- if (ctxs != null) {
- synchronized (ctxs) {
- cancel(ctxs, ctxs.size());
- }
+ synchronized (locSvcs) {
+ ctxs = locSvcs.remove(name);
+ }
+
+ if (ctxs != null) {
+ synchronized (ctxs) {
+ cancel(ctxs, ctxs.size());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
new file mode 100644
index 0000000..1a9ef3a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceContinuousQueryRedeploy.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests whether concurrent service cancel and registering ContinuousQuery doesn't causes
+ * service redeployment.
+ */
+public class GridServiceContinuousQueryRedeploy extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_NAME = "TEST_CACHE";
+
+ /** */
+ private static final String TEST_KEY = "TEST_KEY";
+
+ /** */
+ private static final String SERVICE_NAME = "service1";
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServiceRedeploymentAfterCancel() throws Exception {
+ final Ignite ignite = startGrid(0);
+
+ final IgniteCache<Object, Object> managementCache = ignite.getOrCreateCache(CACHE_NAME);
+
+ final ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+ final List<Object> evts = Collections.synchronizedList(new ArrayList<>());
+
+ qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+ @Override public void onUpdated(
+ Iterable<CacheEntryEvent<?, ?>> iterable) throws CacheEntryListenerException {
+ for (CacheEntryEvent<?, ?> event : iterable)
+ evts.add(event);
+ }
+ });
+
+ int iterations = 100;
+
+ while (iterations-- > 0) {
+ QueryCursor quorumCursor = managementCache.query(qry);
+
+ IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ System.out.println("Deploy " + SERVICE_NAME);
+ deployService(ignite);
+
+ return null;
+ }
+ });
+
+ IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ System.out.println("Undeploy " + SERVICE_NAME);
+ ignite.services().cancel(SERVICE_NAME);
+
+ return null;
+ }
+ });
+
+ fut1.get();
+ fut2.get();
+
+ U.sleep(100);
+
+ assert evts.size() <= 1 : evts.size();
+
+ ignite.services().cancel("service1");
+
+ evts.clear();
+
+ quorumCursor.close();
+ }
+
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ private void deployService(final Ignite ignite) {
+ ServiceConfiguration svcCfg = new ServiceConfiguration();
+
+ svcCfg.setService(new ManagementService());
+ svcCfg.setName(SERVICE_NAME);
+ svcCfg.setTotalCount(1);
+ svcCfg.setMaxPerNodeCount(1);
+ svcCfg.setNodeFilter(new IgnitePredicate<ClusterNode>() {
+ @Override public boolean apply(ClusterNode node) {
+ return !node.isClient();
+ }
+ });
+
+ ignite.services().deploy(svcCfg);
+ }
+
+ /**
+ *
+ */
+ public static class ManagementService implements Service {
+ /** */
+ private final String name = UUID.randomUUID().toString();
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ System.out.println(name + " shutdown.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void init(ServiceContext ctx) throws Exception {
+ System.out.println(name + " initializing.");
+
+ ignite.cache(CACHE_NAME).put(TEST_KEY, name + " init");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) throws Exception {
+ // No-op
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fcb3e74/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 350b715..5977702 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cluster.GridAddressResolverSelfTest
import org.apache.ignite.internal.processors.cluster.GridUpdateNotifierSelfTest;
import org.apache.ignite.internal.processors.port.GridPortProcessorSelfTest;
import org.apache.ignite.internal.processors.service.GridServiceClientNodeTest;
+import org.apache.ignite.internal.processors.service.GridServiceContinuousQueryRedeploy;
import org.apache.ignite.internal.processors.service.GridServicePackagePrivateSelfTest;
import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeConfigSelfTest;
import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeSelfTest;
@@ -143,6 +144,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class);
suite.addTestSuite(IgniteServiceReassignmentTest.class);
suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class);
+ suite.addTestSuite(GridServiceContinuousQueryRedeploy.class);
suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);