You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/10/28 21:41:43 UTC

[89/98] [abbrv] incubator-geode git commit: GEODE-2027: ParallelQueueRemovalMessage processing removes events from the region and temp queue

GEODE-2027: ParallelQueueRemovalMessage processing removes events from the region and temp queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b10a171e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b10a171e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b10a171e

Branch: refs/heads/feature/GEM-983
Commit: b10a171e11990a566ed42560a903668908131390
Parents: 87f2fb5
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Fri Oct 21 12:40:41 2016 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Thu Oct 27 09:17:35 2016 -0700

----------------------------------------------------------------------
 .../cache/wan/AbstractGatewaySender.java        |   2 +-
 .../parallel/ParallelQueueRemovalMessage.java   |  22 +-
 .../internal/cache/BucketRegionQueueHelper.java |  58 +++++
 .../ParallelQueueRemovalMessageJUnitTest.java   | 256 +++++++++++++++++++
 .../java/org/apache/geode/test/fake/Fakes.java  |   5 +-
 5 files changed, 326 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 33119bc..e1c9010 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -684,7 +684,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     return null;
   }
 
-  final public Set<RegionQueue> getQueues() {
+  public Set<RegionQueue> getQueues() {
     if (this.eventProcessor != null) {
       if (!(this.eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor)) {
         Set<RegionQueue> queues = new HashSet<RegionQueue>();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index a363b5d..bad3d3c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -135,22 +135,14 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
                           afterAckForSecondary_EventInBucket(abstractSender, brq, key);
                           destroyKeyFromBucketQueue(brq, key, region);
                           isDestroyed = true;
-                        } else {
-                          // if BucketRegionQueue does not have the key, it
-                          // should be in tempQueue
-                          // remove it from there..defect #49196
-                          isDestroyed =
-                              destroyFromTempQueue(brq.getPartitionedRegion(), (Integer) bId, key);
-                        }
-                        if (!isDestroyed) {
-                          // event is neither destroyed from BucketRegionQueue nor from tempQueue
-                          brq.addToFailedBatchRemovalMessageKeys(key);
-                          if (isDebugEnabled) {
-                            logger.debug(
-                                "Event is neither destroyed from BucketRegionQueue not from tempQueue. Added to failedBatchRemovalMessageKeys: {}",
-                                key);
-                          }
                         }
+
+                        // Even if BucketRegionQueue does not have the key, it could be in the tempQueue
+                        // remove it from there..defect #49196
+                        destroyFromTempQueue(brq.getPartitionedRegion(), (Integer) bId, key);
+
+                        // Finally, add the key to the failed batch removal keys so that it is definitely removed from the bucket region queue
+                        brq.addToFailedBatchRemovalMessageKeys(key);
                       } finally {
                         brq.getInitializationLock().readLock().unlock();
                       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java
new file mode 100644
index 0000000..68b29c2
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Helper class in the internal cache package to access protected BucketRegionQueue methods.
+ */
+public class BucketRegionQueueHelper {
+
+  private BucketRegionQueue bucketRegionQueue;
+
+  public BucketRegionQueueHelper(GemFireCacheImpl cache, PartitionedRegion queueRegion, BucketRegionQueue bucketRegionQueue) {
+    this.bucketRegionQueue = bucketRegionQueue;
+    initialize(cache, queueRegion);
+  }
+
+  public GatewaySenderEventImpl addEvent(Object key) {
+    this.bucketRegionQueue.getEventTracker().setInitialized();
+    this.bucketRegionQueue.entries.disableLruUpdateCallback();
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    this.bucketRegionQueue.entries.initialImagePut(key, 0, event, false, false, null, null, false);
+    this.bucketRegionQueue.entries.enableLruUpdateCallback();
+    return event;
+  }
+
+  public void cleanUpDestroyedTokensAndMarkGIIComplete() {
+    this.bucketRegionQueue.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);
+  }
+
+  public void initialize(GemFireCacheImpl cache, PartitionedRegion queueRegion) {
+    InternalDistributedMember member = cache.getMyId();
+    when(queueRegion.getMyId()).thenReturn(member);
+    when(cache.getRegionByPath(this.bucketRegionQueue.getFullPath())).thenReturn(this.bucketRegionQueue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
new file mode 100644
index 0000000..cc9caaf
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.*;
+import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.lru.LRUAlgorithm;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(UnitTest.class)
+public class ParallelQueueRemovalMessageJUnitTest {
+
+  private GemFireCacheImpl cache;
+  private PartitionedRegion queueRegion;
+  private AbstractGatewaySender sender;
+  private PartitionedRegion rootRegion;
+  private BucketRegionQueue bucketRegionQueue;
+  private BucketRegionQueueHelper bucketRegionQueueHelper;
+
+  private static String GATEWAY_SENDER_ID = "ny";
+  private static int BUCKET_ID = 85;
+  private static long KEY = 198l;
+
+  @Before
+  public void setUpGemFire() {
+    createCache();
+    createQueueRegion();
+    createGatewaySender();
+    createRootRegion();
+    createBucketRegionQueue();
+  }
+
+  private void createCache() {
+    // Mock cache
+    this.cache = Fakes.cache();
+    GemFireCacheImpl.setInstanceForTests(this.cache);
+  }
+
+  private void createQueueRegion() {
+    // Mock queue region
+    this.queueRegion = mock(PartitionedRegion.class);
+    when(this.queueRegion.getFullPath()).thenReturn(getRegionQueueName());
+    when(this.queueRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class));
+    when(this.queueRegion.getDataStore()).thenReturn(mock(PartitionedRegionDataStore.class));
+    when(this.queueRegion.getCache()).thenReturn(this.cache);
+    EvictionAttributesImpl ea = (EvictionAttributesImpl) EvictionAttributes.createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK);
+    LRUAlgorithm algorithm = ea.createEvictionController(this.queueRegion, false);
+    algorithm.getLRUHelper().initStats(this.queueRegion, this.cache.getDistributedSystem());
+    when(this.queueRegion.getEvictionController()).thenReturn(algorithm);
+  }
+
+  private void createGatewaySender() {
+    // Mock gateway sender
+    this.sender = mock(AbstractGatewaySender.class);
+    when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender);
+    when(this.sender.getQueues()).thenReturn(null);
+    when(this.sender.getDispatcherThreads()).thenReturn(1);
+    when(this.sender.getCache()).thenReturn(this.cache);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(sender.getCancelCriterion()).thenReturn(cancelCriterion);
+  }
+
+  private void createRootRegion() {
+    // Mock root region
+    this.rootRegion = mock(PartitionedRegion.class);
+    when(this.rootRegion.getFullPath()).thenReturn(Region.SEPARATOR+PartitionedRegionHelper.PR_ROOT_REGION_NAME);
+    when(this.cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(this.rootRegion);
+    when(this.cache.getRegion(getRegionQueueName(), false)).thenReturn(this.queueRegion);
+  }
+
+  private void createBucketRegionQueue() {
+    // Create InternalRegionArguments
+    InternalRegionArguments ira = new InternalRegionArguments();
+    ira.setPartitionedRegion(this.queueRegion);
+    ira.setPartitionedRegionBucketRedundancy(1);
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    ira.setBucketAdvisor(ba);
+    InternalRegionArguments pbrIra = new InternalRegionArguments();
+    RegionAdvisor ra = mock(RegionAdvisor.class);
+    when(ra.getPartitionedRegion()).thenReturn(this.queueRegion);
+    pbrIra.setPartitionedRegionAdvisor(ra);
+    PartitionAttributes pa = mock(PartitionAttributes.class);
+    when(this.queueRegion.getPartitionAttributes()).thenReturn(pa);
+    when(this.queueRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+    when(pa.getColocatedWith()).thenReturn(null);
+    ProxyBucketRegion pbr = new ProxyBucketRegion(BUCKET_ID, this.queueRegion, pbrIra); // final classes cannot be mocked
+    when(ba.getProxyBucketRegion()).thenReturn(pbr);
+
+    // Create RegionAttributes
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    factory.setDataPolicy(DataPolicy.REPLICATE);
+    factory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK));
+    RegionAttributes attributes = factory.create();
+
+    // Create BucketRegionQueue
+    this.bucketRegionQueue = new BucketRegionQueue(this.queueRegion.getBucketName(BUCKET_ID), attributes, this.rootRegion, this.cache, ira);
+    this.bucketRegionQueueHelper = new BucketRegionQueueHelper(this.cache, this.queueRegion, this.bucketRegionQueue);
+  }
+
+  @After
+  public void tearDownGemFire() {
+    GemFireCacheImpl.setInstanceForTests(null);
+  }
+
+  @Test
+  public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue() throws Exception {
+    // Validate initial BucketRegionQueue state
+    assertFalse(this.bucketRegionQueue.isInitialized());
+    assertEquals(0, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+
+    // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to add a key)
+    createAndProcessParallelQueueRemovalMessage();
+
+    // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
+    assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+  }
+
+  @Test
+  public void validateDestroyKeyFromBucketQueueInUninitializedBucketRegionQueue() throws Exception {
+    // Validate initial BucketRegionQueue state
+    assertEquals(0, this.bucketRegionQueue.size());
+    assertFalse(this.bucketRegionQueue.isInitialized());
+
+    // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
+    this.bucketRegionQueueHelper.addEvent(KEY);
+    assertEquals(1, this.bucketRegionQueue.size());
+
+    // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to DESTROYED)
+    when(this.queueRegion.getKeyInfo(KEY, null, null)).thenReturn(new KeyInfo(KEY, null, null));
+    createAndProcessParallelQueueRemovalMessage();
+
+    // Clean up destroyed tokens and validate BucketRegionQueue
+    this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
+    assertEquals(0, this.bucketRegionQueue.size());
+  }
+
+  @Test
+  public void validateDestroyFromTempQueueInUninitializedBucketRegionQueue() throws Exception {
+    // Validate initial BucketRegionQueue state
+    assertFalse(this.bucketRegionQueue.isInitialized());
+
+    // Create a real ConcurrentParallelGatewaySenderQueue
+    ParallelGatewaySenderEventProcessor pgsep = createConcurrentParallelGatewaySenderQueue();
+
+    // Add a mock GatewaySenderEventImpl to the temp queue
+    BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(pgsep, mock(GatewaySenderEventImpl.class));
+    assertEquals(1, tempQueue.size());
+
+    // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to add a key)
+    createAndProcessParallelQueueRemovalMessage();
+
+    // Validate temp queue is empty after processing ParallelQueueRemovalMessage
+    assertEquals(0, tempQueue.size());
+  }
+
+  @Test
+  public void validateDestroyFromBucketQueueAndTempQueueInUninitializedBucketRegionQueue() {
+    // Validate initial BucketRegionQueue state
+    assertFalse(this.bucketRegionQueue.isInitialized());
+    assertEquals(0, this.bucketRegionQueue.size());
+
+    // Create a real ConcurrentParallelGatewaySenderQueue
+    ParallelGatewaySenderEventProcessor pgsep = createConcurrentParallelGatewaySenderQueue();
+
+    // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
+    GatewaySenderEventImpl gsei = this.bucketRegionQueueHelper.addEvent(KEY);
+    assertEquals(1, this.bucketRegionQueue.size());
+
+    // Add a mock GatewaySenderEventImpl to the temp queue
+    BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(pgsep, gsei);
+    assertEquals(1, tempQueue.size());
+
+    // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to DESTROYED)
+    when(this.queueRegion.getKeyInfo(KEY, null, null)).thenReturn(new KeyInfo(KEY, null, null));
+    createAndProcessParallelQueueRemovalMessage();
+
+    // Validate temp queue is empty after processing ParallelQueueRemovalMessage
+    assertEquals(0, tempQueue.size());
+
+    // Clean up destroyed tokens
+    this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
+
+    // Validate BucketRegionQueue is empty after processing ParallelQueueRemovalMessage
+    assertEquals(0, this.bucketRegionQueue.size());
+  }
+
+  private void createAndProcessParallelQueueRemovalMessage() {
+    ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(createRegionToDispatchedKeysMap());
+    pqrm.process(null);
+  }
+
+  private HashMap<String, Map<Integer, List<Long>>> createRegionToDispatchedKeysMap() {
+    HashMap<String, Map<Integer, List<Long>>> regionToDispatchedKeys = new HashMap<>();
+    Map<Integer, List<Long>> bucketIdToDispatchedKeys = new HashMap<>();
+    List<Long> dispatchedKeys = new ArrayList<>();
+    dispatchedKeys.add(KEY);
+    bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys);
+    regionToDispatchedKeys.put(getRegionQueueName(), bucketIdToDispatchedKeys);
+    return regionToDispatchedKeys;
+  }
+
+  private ParallelGatewaySenderEventProcessor createConcurrentParallelGatewaySenderQueue() {
+    ParallelGatewaySenderEventProcessor pgsep = new ParallelGatewaySenderEventProcessor(sender);
+    ConcurrentParallelGatewaySenderQueue cpgsq = new ConcurrentParallelGatewaySenderQueue(sender, new ParallelGatewaySenderEventProcessor[] {pgsep});
+    Set<RegionQueue> queues = new HashSet<>();
+    queues.add(cpgsq);
+    when(this.sender.getQueues()).thenReturn(queues);
+    return pgsep;
+  }
+
+  private BlockingQueue<GatewaySenderEventImpl> createTempQueueAndAddEvent(ParallelGatewaySenderEventProcessor pgsep, GatewaySenderEventImpl gsei) {
+    ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) pgsep.getQueue();
+    Map<Integer, BlockingQueue<GatewaySenderEventImpl>> tempQueueMap = pgsq.getBucketToTempQueueMap();
+    BlockingQueue<GatewaySenderEventImpl> tempQueue = new LinkedBlockingQueue();
+    when(gsei.getShadowKey()).thenReturn(KEY);
+    tempQueue.add(gsei);
+    tempQueueMap.put(BUCKET_ID, tempQueue);
+    return tempQueue;
+  }
+
+  private String getRegionQueueName() {
+    return Region.SEPARATOR+GATEWAY_SENDER_ID+ ParallelGatewaySenderQueue.QSTRING;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java b/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
index 93195c1..2ab64dd 100644
--- a/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
+++ b/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
@@ -26,7 +26,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.AbstractRegion;
+import org.apache.geode.internal.cache.CachePerfStats;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 
 import java.io.File;
@@ -74,9 +74,11 @@ public class Fakes {
     when(config.getDeployWorkingDir()).thenReturn(new File("."));
 
     when(cache.getDistributedSystem()).thenReturn(system);
+    when(cache.getSystem()).thenReturn(system);
     when(cache.getMyId()).thenReturn(member);
     when(cache.getDistributionManager()).thenReturn(distributionManager);
     when(cache.getCancelCriterion()).thenReturn(systemCancelCriterion);
+    when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
 
     when(system.getDistributedMember()).thenReturn(member);
     when(system.getConfig()).thenReturn(config);
@@ -88,6 +90,7 @@ public class Fakes {
     when(system.createAtomicStatistics(any(), any())).thenReturn(stats);
 
     when(distributionManager.getId()).thenReturn(member);
+    when(distributionManager.getDistributionManagerId()).thenReturn(member);
     when(distributionManager.getConfig()).thenReturn(config);
     when(distributionManager.getSystem()).thenReturn(system);
     when(distributionManager.getCancelCriterion()).thenReturn(systemCancelCriterion);