You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/08/15 07:38:32 UTC
[03/30] ignite git commit: IGNITE-2310 Lock cache partition for
affinityRun/affinityCall execution
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTest.java
new file mode 100644
index 0000000..168b25c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTest.java
@@ -0,0 +1,852 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310
+ */
+public class IgniteCacheLockPartitionOnAffinityRunTest extends IgniteCacheLockPartitionOnAffinityRunAbstractTest {
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ * @throws Exception If failed.
+ */
+ private static int getPersonsCountFromPartitionMapCheckBothCaches(final IgniteEx ignite, IgniteLogger log,
+ int orgId) throws Exception {
+
+ assertEquals(1, getOrganizationCountFromPartitionMap(ignite, log, orgId));
+
+ return getPersonsCountFromPartitionMap(ignite, log, orgId);
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ * @throws Exception If failed.
+ */
+ private static int getOrganizationCountFromPartitionMap(final IgniteEx ignite, IgniteLogger log,
+ int orgId) throws Exception {
+ int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
+
+ GridCacheAdapter<?, ?> cacheAdapterOrg = ignite.context().cache()
+ .internalCache(Organization.class.getSimpleName());
+
+ GridDhtLocalPartition pOrgs = cacheAdapterOrg.context().topology()
+ .localPartition(part, AffinityTopologyVersion.NONE, false);
+
+ int cnt = 0;
+ for (GridCacheMapEntry e : pOrgs.entries()) {
+ Integer k = (Integer)e.keyValue(false);
+ Organization org = e.val.value(ignite.context().cacheObjects().contextForCache(
+ cacheAdapterOrg.cacheCfg), false);
+
+ if (org != null && org.getId() == orgId)
+ ++cnt;
+ }
+
+ return cnt;
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ * @throws Exception If failed.
+ */
+ private static int getPersonsCountFromPartitionMap(final IgniteEx ignite, IgniteLogger log, int orgId)
+ throws Exception {
+ int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
+
+ GridCacheAdapter<?, ?> cacheAdapterPers = ignite.context().cache()
+ .internalCache(Person.class.getSimpleName());
+
+ GridDhtLocalPartition pPers = cacheAdapterPers.context().topology()
+ .localPartition(part, AffinityTopologyVersion.NONE, false);
+
+ int cnt = 0;
+ for (GridCacheMapEntry e : pPers.entries()) {
+ Person.Key k = (Person.Key)e.keyValue(false);
+ Person p = e.val.value(ignite.context().cacheObjects().contextForCache(
+ cacheAdapterPers.cacheCfg), false);
+
+ if (p != null && p.getOrgId() == orgId && k.orgId == orgId)
+ ++cnt;
+ }
+
+ return cnt;
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ */
+ private static int getPersonsCountBySqlFieldLocalQuery(final IgniteEx ignite, IgniteLogger log, int orgId) {
+ List res = ignite.cache(Person.class.getSimpleName())
+ .query(new SqlFieldsQuery(
+ String.format("SELECT p.id FROM \"%s\".Person as p " +
+ "WHERE p.orgId = " + orgId,
+ Person.class.getSimpleName())).setLocal(true))
+ .getAll();
+
+ return res.size();
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ */
+ private static int getPersonsCountBySqlFieledLocalQueryJoinOrgs(final IgniteEx ignite, IgniteLogger log,
+ int orgId) {
+ List res = ignite.cache(Person.class.getSimpleName())
+ .query(new SqlFieldsQuery(
+ String.format("SELECT p.id FROM \"%s\".Person as p, \"%s\".Organization as o " +
+ "WHERE p.orgId = o.id " +
+ "AND p.orgId = " + orgId,
+ Person.class.getSimpleName(),
+ Organization.class.getSimpleName())).setLocal(true))
+ .getAll();
+
+ return res.size();
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ */
+ private static int getPersonsCountBySqlLocalQuery(final IgniteEx ignite, IgniteLogger log, int orgId) {
+ List res = ignite.cache(Person.class.getSimpleName())
+ .query(new SqlQuery<Person.Key, Person>(Person.class, "orgId = ?").setArgs(orgId).setLocal(true))
+ .getAll();
+
+ return res.size();
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ */
+ private static int getPersonsCountByScanLocalQuery(final IgniteEx ignite, IgniteLogger log, final int orgId) {
+ List res = ignite.cache(Person.class.getSimpleName())
+ .query(new ScanQuery<>(new IgniteBiPredicate<Person.Key, Person>() {
+ @Override public boolean apply(Person.Key key, Person person) {
+ return person.getOrgId() == orgId;
+ }
+ }).setLocal(true)).getAll();
+
+ return res.size();
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ * @throws Exception If failed.
+ */
+ private static int getPersonsCountSingleCache(final IgniteEx ignite, IgniteLogger log, final int orgId)
+ throws Exception {
+ int sqlCnt = getPersonsCountBySqlLocalQuery(ignite, log, orgId);
+ int sqlFieldCnt = getPersonsCountBySqlFieldLocalQuery(ignite, log, orgId);
+ int scanCnt = getPersonsCountByScanLocalQuery(ignite, log, orgId);
+ int partCnt = getPersonsCountFromPartitionMap(ignite, log, orgId);
+
+ assertEquals(PERS_AT_ORG_CNT, partCnt);
+ assertEquals(partCnt, sqlCnt);
+ assertEquals(partCnt, sqlFieldCnt);
+ assertEquals(partCnt, scanCnt);
+
+ return partCnt;
+ }
+
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Organization id.
+ * @return Count of found Person object with specified orgId
+ * @throws Exception If failed.
+ */
+ private static int getPersonsCountMultipleCache(final IgniteEx ignite, IgniteLogger log, final int orgId)
+ throws Exception {
+ int sqlFieldCnt = getPersonsCountBySqlFieledLocalQueryJoinOrgs(ignite, log, orgId);
+ int partCnt = getPersonsCountFromPartitionMapCheckBothCaches(ignite, log, orgId);
+
+ assertEquals(partCnt, sqlFieldCnt);
+
+ return partCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ // Workaround for initial update job metadata.
+ grid(0).compute().affinityCall(
+ Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
+ 0,
+ new TestAffinityCall(new PersonsCountGetter() {
+ @Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception {
+ return PERS_AT_ORG_CNT;
+ }
+ }, 0));
+
+ grid(0).compute().affinityRun(
+ Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
+ 0,
+ new TestAffinityRun(new PersonsCountGetter() {
+ @Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception {
+ return PERS_AT_ORG_CNT;
+ }
+ }, 0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSingleCache() throws Exception {
+ final PersonsCountGetter personsCntGetter = new PersonsCountGetter() {
+ @Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception {
+ return getPersonsCountSingleCache(ignite, log, orgId);
+ }
+ };
+
+ // Run restart threads: start re-balancing.
+ beginNodesRestart();
+
+ IgniteInternalFuture<Long> affFut = null;
+
+ try {
+ final AtomicInteger threadNum = new AtomicInteger(0);
+ affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ if (threadNum.getAndIncrement() % 2 == 0) {
+ while (System.currentTimeMillis() < endTime) {
+ for (final int orgId : orgIds) {
+ if (System.currentTimeMillis() >= endTime)
+ break;
+
+ grid(0).compute().affinityRun(Person.class.getSimpleName(),
+ new Person(0, orgId).createKey(),
+ new TestAffinityRun(personsCntGetter, orgId));
+ }
+ }
+ }
+ else {
+ while (System.currentTimeMillis() < endTime) {
+ for (final int orgId : orgIds) {
+ if (System.currentTimeMillis() >= endTime)
+ break;
+
+ int personsCnt = grid(0).compute().affinityCall(Person.class.getSimpleName(),
+ new Person(0, orgId).createKey(),
+ new TestAffinityCall(personsCntGetter, orgId));
+
+ assertEquals(PERS_AT_ORG_CNT, personsCnt);
+ }
+ }
+ }
+ }
+ }, AFFINITY_THREADS_CNT, "affinity-run");
+ }
+ finally {
+ if (affFut != null)
+ affFut.get();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultipleCaches() throws Exception {
+ final PersonsCountGetter personsCntGetter = new PersonsCountGetter() {
+ @Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception {
+ return getPersonsCountMultipleCache(ignite, log, orgId);
+ }
+ };
+ // Run restart threads: start re-balancing
+ beginNodesRestart();
+
+ IgniteInternalFuture<Long> affFut = null;
+ try {
+ final AtomicInteger threadNum = new AtomicInteger(0);
+ affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ if (threadNum.getAndIncrement() % 2 == 0) {
+ while (System.currentTimeMillis() < endTime) {
+ for (final int orgId : orgIds) {
+ if (System.currentTimeMillis() >= endTime)
+ break;
+
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new TestAffinityRun(personsCntGetter, orgId));
+ }
+ }
+ }
+ else {
+ while (System.currentTimeMillis() < endTime) {
+ for (final int orgId : orgIds) {
+ if (System.currentTimeMillis() >= endTime)
+ break;
+
+ int personsCnt = grid(0).compute().affinityCall(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new TestAffinityCall(personsCntGetter, orgId));
+
+ assertEquals(PERS_AT_ORG_CNT, personsCnt);
+ }
+ }
+ }
+
+ }
+ }, AFFINITY_THREADS_CNT, "affinity-run");
+ }
+ finally {
+ if (affFut != null)
+ affFut.get();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCheckReservePartitionException() throws Exception {
+ int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+ try {
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), OTHER_CACHE_NAME),
+ new Integer(orgId),
+ new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ });
+
+ fail("Exception is expected");
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage()
+ .startsWith("Failed partition reservation. Partition is not primary on the node."));
+ }
+
+ try {
+ grid(0).compute().affinityCall(
+ Arrays.asList(Organization.class.getSimpleName(), OTHER_CACHE_NAME),
+ new Integer(orgId),
+ new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ return null;
+ }
+ });
+
+ fail("Exception is expected");
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage()
+ .startsWith("Failed partition reservation. Partition is not primary on the node."));
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReleasePartitionJobCompletesNormally() throws Exception {
+ final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new IgniteRunnable() {
+ @IgniteInstanceResource
+ IgniteEx ignite;
+
+ @Override public void run() {
+ try {
+ checkPartitionsReservations(ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+ }
+ });
+
+ checkPartitionsReservations(grid(1), orgId, 0);
+
+ grid(0).compute().affinityCall(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new IgniteCallable<Object>() {
+ @IgniteInstanceResource
+ IgniteEx ignite;
+
+ @Override public Object call() {
+ try {
+ checkPartitionsReservations(ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+ return null;
+ }
+ });
+
+ checkPartitionsReservations(grid(1), orgId, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReleasePartitionJobThrowsException() throws Exception {
+ final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+ try {
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new IgniteRunnable() {
+ @IgniteInstanceResource
+ IgniteEx ignite;
+
+ @Override public void run() {
+ try {
+ checkPartitionsReservations(ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+
+ }
+ throw new RuntimeException("Test job throws exception");
+ }
+ });
+
+ fail("Exception must be thrown");
+ }
+ catch (Exception e) {
+ checkPartitionsReservations(grid(1), orgId, 0);
+ }
+
+ try {
+ grid(0).compute().affinityCall(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new IgniteCallable<Object>() {
+ @IgniteInstanceResource
+ IgniteEx ignite;
+
+ @Override public Object call() {
+ try {
+ checkPartitionsReservations(ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+ throw new RuntimeException("Test job throws exception");
+ }
+ });
+
+ fail("Exception must be thrown");
+ }
+ catch (Exception e) {
+ checkPartitionsReservations(grid(1), orgId, 0);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReleasePartitionJobThrowsError() throws Exception {
+ final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+ try {
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new IgniteRunnable() {
+ @IgniteInstanceResource
+ IgniteEx ignite;
+
+ @Override public void run() {
+ try {
+ checkPartitionsReservations(ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+ throw new Error("Test job throws error");
+ }
+ });
+
+ fail("Error must be thrown");
+ }
+ catch (Throwable e) {
+ checkPartitionsReservations(grid(1), orgId, 0);
+ }
+
+ try {
+ grid(0).compute().affinityCall(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new IgniteCallable<Object>() {
+ @IgniteInstanceResource
+ IgniteEx ignite;
+
+ @Override public Object call() {
+ try {
+ checkPartitionsReservations(ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+ throw new Error("Test job throws error");
+ }
+ });
+
+ fail("Error must be thrown");
+ }
+ catch (Throwable e) {
+ checkPartitionsReservations(grid(1), orgId, 0);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReleasePartitionJobUnmarshalingFails() throws Exception {
+ final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+ try {
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new JobFailUnmarshaling());
+ fail("Unmarshaling exception must be thrown");
+ }
+ catch (Exception e) {
+ checkPartitionsReservations(grid(1), orgId, 0);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReleasePartitionJobMasterLeave() throws Exception {
+ final int orgId = primaryKey(grid(0).cache(Organization.class.getSimpleName()));
+
+ try {
+ grid(1).compute().withAsync().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new IgniteRunnable() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public void run() {
+ try {
+ checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ }
+ });
+
+ stopGrid(1, true);
+
+ Thread.sleep(3000);
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsReservations(grid(0), orgId, 0);
+ }
+ finally {
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+ }
+
+
+ try {
+ grid(1).compute().withAsync().affinityCall(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new IgniteCallable<Object>() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public Object call() {
+ try {
+ checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ return null;
+ }
+ });
+
+ stopGrid(1, true);
+
+ Thread.sleep(3000);
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsReservations(grid(0), orgId, 0);
+ }
+ finally {
+ startGrid(1);
+
+ awaitPartitionMapExchange();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReleasePartitionJobImplementMasterLeave() throws Exception {
+ final int orgId = primaryKey(grid(0).cache(Organization.class.getSimpleName()));
+
+ try {
+ grid(1).compute().withAsync().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new RunnableWithMasterLeave() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteException {
+ // No-op.
+ }
+
+ @Override public void run() {
+ try {
+ checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ }
+ });
+
+ stopGrid(1, true);
+
+ Thread.sleep(3000);
+
+ awaitPartitionMapExchange();
+
+ checkPartitionsReservations(grid(0), orgId, 0);
+ }
+ finally {
+ startGrid(1);
+ awaitPartitionMapExchange();
+ }
+ }
+
+ /** */
+ private interface PersonsCountGetter {
+ /**
+ * @param ignite Ignite.
+ * @param log Logger.
+ * @param orgId Org id.
+ * @return Count of found Person object with specified orgId
+ * @throws Exception If failed.
+ */
+ int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception;
+ }
+
+ /** */
+ interface RunnableWithMasterLeave extends IgniteRunnable, ComputeJobMasterLeaveAware {
+ }
+
+ /** */
+ private static class TestAffinityCall implements IgniteCallable<Integer> {
+ /** Persons count getter. */
+ PersonsCountGetter personsCntGetter;
+
+ /** Org id. */
+ int orgId;
+
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ public TestAffinityCall() {
+ // No-op.
+ }
+
+ /**
+ * @param personsCntGetter Object to count Person.
+ * @param orgId Organization Id.
+ */
+ public TestAffinityCall(PersonsCountGetter personsCntGetter, int orgId) {
+ this.personsCntGetter = personsCntGetter;
+ this.orgId = orgId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ log.info("Begin call. orgId=" + orgId);
+ return personsCntGetter.getPersonsCount(ignite, log, orgId);
+ }
+ }
+
+ /** */
+ private static class TestAffinityRun implements IgniteRunnable {
+ /** Persons count getter. */
+ PersonsCountGetter personsCntGetter;
+
+ /** Org id. */
+ int orgId;
+
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ /** */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ public TestAffinityRun() {
+ // No-op.
+ }
+
+ /**
+ * @param personsCntGetter Object to count Person.
+ * @param orgId Organization Id.
+ */
+ public TestAffinityRun(PersonsCountGetter personsCntGetter, int orgId) {
+ this.personsCntGetter = personsCntGetter;
+ this.orgId = orgId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ log.info("Begin run. orgId=" + orgId);
+ int cnt = personsCntGetter.getPersonsCount(ignite, log, orgId);
+ assertEquals(PERS_AT_ORG_CNT, cnt);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /** */
+ static class JobFailUnmarshaling implements Externalizable, IgniteRunnable {
+ /**
+ * Default constructor (required by Externalizable).
+ */
+ public JobFailUnmarshaling() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ //No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ throw new IOException("Test job unmarshaling fails");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ fail("Must not be executed");
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.java
new file mode 100644
index 0000000..3e9f9d6
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310.
+ */
+public class IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest
+ extends IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.java
new file mode 100644
index 0000000..c0b896a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.CollisionExternalListener;
+import org.apache.ignite.spi.collision.CollisionJobContext;
+import org.apache.ignite.spi.collision.CollisionSpi;
+import org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi;
+import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310
+ */
+public class IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest
+ extends IgniteCacheLockPartitionOnAffinityRunAbstractTest {
+
+ private static volatile boolean cancelAllJobs = false;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CollisionSpi colSpi = new AlwaysCancelCollisionSpi();
+
+ cfg.setCollisionSpi(colSpi);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionReservation() throws Exception {
+ int orgId = 0;
+ cancelAllJobs = true;
+ // Workaround for initial update job metadata.
+ try {
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new TestRun(orgId));
+ } catch (Exception e) {
+ // No-op. Swallow exceptions on run (e.g. job canceling etc.).
+ // The test checks only correct partition release in case CollisionSpi is used.
+ }
+ // All partition must be released in spite of any exceptions during the job executions.
+ cancelAllJobs = false;
+ ClusterNode n = grid(0).context().affinity()
+ .mapKeyToNode(Organization.class.getSimpleName(), orgId);
+ checkPartitionsReservations((IgniteEx)grid(n), orgId, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void _testJobFinishing() throws Exception {
+// fail("Affinity run / call doesn't receive response where many job rejections happen.");
+ final AtomicInteger jobNum = new AtomicInteger(0);
+
+ cancelAllJobs = true;
+
+ IgniteInternalFuture<Long> affFut = null;
+ try {
+ affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ while (System.currentTimeMillis() < endTime) {
+ int n = 0;
+ try {
+ for (final int orgId : orgIds) {
+ if (System.currentTimeMillis() >= endTime)
+ break;
+
+ n = jobNum.getAndIncrement();
+
+ log.info("+++ Job submitted " + n);
+ grid(0).compute().affinityRun(
+ Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+ new Integer(orgId),
+ new TestRun(n));
+ }
+ }
+ catch (Exception e) {
+ log.info("+++ Job failed " + n + " " + e.toString());
+ // No-op. Swallow exceptions on run (e.g. job canceling etc.).
+ }
+ }
+
+ }
+ }, AFFINITY_THREADS_CNT, "affinity-run");
+ }
+ finally {
+ if (affFut != null)
+ affFut.get();
+
+ stopRestartThread.set(true);
+
+ cancelAllJobs = false;
+
+ // Should not be timed out.
+ awaitPartitionMapExchange();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestRun implements IgniteRunnable {
+ private int jobNum;
+
+ /** Ignite Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /**
+ *
+ */
+ public TestRun() {
+
+ }
+
+ /**
+ * @param jobNum Job number.
+ */
+ public TestRun(int jobNum) {
+ this.jobNum = jobNum;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ // No-op.
+ }
+ }
+
+ /** */
+ @SuppressWarnings({"PublicInnerClass"})
+ @IgniteSpiMultipleInstancesSupport(true)
+ public static class AlwaysCancelCollisionSpi extends IgniteSpiAdapter implements CollisionSpi {
+ /** Grid logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public void onCollision(CollisionContext ctx) {
+ Collection<CollisionJobContext> waitJobs = ctx.waitingJobs();
+ if (cancelAllJobs) {
+ for (CollisionJobContext job : waitJobs)
+ job.cancel();
+ } else {
+ for (CollisionJobContext job : waitJobs)
+ job.activate();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStart(String gridName) throws IgniteSpiException {
+ // Start SPI start stopwatch.
+ startStopwatch();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void spiStop() throws IgniteSpiException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setExternalCollisionListener(CollisionExternalListener lsnr) {
+ // No-op.
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheAffinityRunTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheAffinityRunTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheAffinityRunTestSuite.java
new file mode 100644
index 0000000..ef00fc3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheAffinityRunTestSuite.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheLockPartitionOnAffinityRunTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest;
+
+/**
+ * Compute and Cache tests for affinityRun/Call. These tests is extracted into separate suite
+ * because ones take a lot of time.
+ */
+public class IgniteCacheAffinityRunTestSuite extends TestSuite {
+ /**
+ * @return Test suite.
+ * @throws Exception Thrown in case of the failure.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Ignite Compute and Cache Affinity Run Test Suite");
+
+ suite.addTestSuite(IgniteCacheLockPartitionOnAffinityRunTest.class);
+ suite.addTestSuite(IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.class);
+ suite.addTestSuite(IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.class);
+ suite.addTestSuite(IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.class);
+
+ return suite;
+ }
+}
\ No newline at end of file