You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2017/12/06 04:04:20 UTC
lucene-solr:master: SOLR-11669: Policy Session lifecycle cleanup
Repository: lucene-solr
Updated Branches:
refs/heads/master e84cce8ea -> 071d9270d
SOLR-11669: Policy Session lifecycle cleanup
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/071d9270
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/071d9270
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/071d9270
Branch: refs/heads/master
Commit: 071d9270d5e02a40bc36833b15f1e738ca2c3e51
Parents: e84cce8
Author: Noble Paul <no...@apache.org>
Authored: Wed Dec 6 15:04:04 2017 +1100
Committer: Noble Paul <no...@apache.org>
Committed: Wed Dec 6 15:04:04 2017 +1100
----------------------------------------------------------------------
.../org/apache/solr/cloud/AddReplicaCmd.java | 15 +-
.../src/java/org/apache/solr/cloud/Assign.java | 11 +-
.../apache/solr/cloud/CreateCollectionCmd.java | 4 +-
.../org/apache/solr/cloud/CreateShardCmd.java | 4 +-
.../java/org/apache/solr/cloud/RestoreCmd.java | 4 +-
.../org/apache/solr/cloud/SplitShardCmd.java | 5 +-
.../org/apache/solr/cloud/UtilizeNodeCmd.java | 12 +-
.../cloud/autoscaling/ComputePlanAction.java | 51 ++--
.../admin/AutoscalingHistoryHandlerTest.java | 2 +-
.../autoscaling/DelegatingCloudManager.java | 3 +-
.../DelegatingDistribStateManager.java | 91 ++++++
.../solrj/cloud/autoscaling/PolicyHelper.java | 288 +++++++++++++++----
.../java/org/apache/solr/common/util/Utils.java | 11 +
.../solrj/cloud/autoscaling/TestPolicy.java | 97 ++++---
14 files changed, 457 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index b5101f5..c785f9f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
@@ -101,8 +101,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
- final Long policyVersionBefore = PolicyHelper.REF_VERSION.get();
- AtomicLong policyVersionAfter = new AtomicLong(-1);
+
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
if (!skipCreateReplicaInClusterState) {
if (CreateShardCmd.usePolicyFramework(coll, ocmh)) {
@@ -118,9 +118,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
replicaType == Replica.Type.TLOG ? 0 : 1,
replicaType == Replica.Type.PULL ? 0 : 1
).get(0).node;
- if (policyVersionBefore == null && PolicyHelper.REF_VERSION.get() != null) {
- policyVersionAfter.set(PolicyHelper.REF_VERSION.get());
- }
+ sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
}
} else {
node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
@@ -220,9 +218,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
Runnable runnable = () -> {
ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
- if (policyVersionAfter.get() > -1) {
- PolicyHelper.REF_VERSION.remove();
- PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()).decref(policyVersionAfter.get());
+ if (sessionWrapper.get() != null) {
+ sessionWrapper.get().release();
}
if (onComplete != null) onComplete.run();
};
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index 9967b23..fd0738f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -293,15 +293,8 @@ public class Assign {
} else {
if (message.getStr(CREATE_NODE_SET) == null)
nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
- synchronized (PolicyHelper.class) {
- PolicyHelper.SESSION_REF.set(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
- try {
- return getPositionsUsingPolicy(collectionName,
- shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getSolrCloudManager(), nodeList);
- } finally {
- PolicyHelper.SESSION_REF.remove();
- }
- }
+ return getPositionsUsingPolicy(collectionName,
+ shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getSolrCloudManager(), nodeList);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 94d7d9f..2c4f01e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -103,6 +103,7 @@ public class CreateCollectionCmd implements Cmd {
}
ocmh.validateConfigOrThrowSolrException(configName);
+ PolicyHelper.SessionWrapper sessionWrapper = null;
try {
// look at the replication factor and see if it matches reality
@@ -184,6 +185,7 @@ public class CreateCollectionCmd implements Cmd {
}
replicaPositions = Assign.identifyNodes(ocmh
, clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+ sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
}
ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -318,7 +320,7 @@ public class CreateCollectionCmd implements Cmd {
} catch (Exception ex) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
} finally {
- PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
+ if(sessionWrapper != null) sessionWrapper.release();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
index 73ac7ff..18b0b63 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
@@ -88,6 +88,7 @@ public class CreateShardCmd implements Cmd {
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
ZkStateReader zkStateReader = ocmh.zkStateReader;
+ PolicyHelper.SessionWrapper sessionWrapper = null;
boolean usePolicyFramework = usePolicyFramework(collection,ocmh);
List<ReplicaPosition> positions = null;
SolrCloseableLatch countDownLatch;
@@ -103,6 +104,7 @@ public class CreateShardCmd implements Cmd {
numNrtReplicas,
numTlogReplicas,
numPullReplicas);
+ sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
} else {
List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
createNodeSetStr, ocmh.overseer.getSolrCloudManager());
@@ -164,7 +166,7 @@ public class CreateShardCmd implements Cmd {
});
}
} finally {
- PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
+ if(sessionWrapper != null) sessionWrapper.release();
}
log.debug("Waiting for create shard action to complete");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 1f9d64a..039ab5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -219,6 +219,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
List<String> sliceNames = new ArrayList<>();
restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
+ PolicyHelper.SessionWrapper sessionWrapper = null;
try {
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
@@ -226,6 +227,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
nodeList, restoreCollectionName,
message, sliceNames,
numNrtReplicas, numTlogReplicas, numPullReplicas);
+ sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
//Create one replica per shard and copy backed up data to it
for (Slice slice : restoreCollection.getSlices()) {
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
@@ -350,7 +352,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
} finally {
- PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
+ if (sessionWrapper != null) sessionWrapper.release();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index 2c1e5d6..8f65255 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -92,6 +92,8 @@ public class SplitShardCmd implements Cmd {
DocCollection collection = clusterState.getCollection(collectionName);
DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
+ PolicyHelper.SessionWrapper sessionWrapper = null;
+
Slice parentSlice;
@@ -392,6 +394,7 @@ public class SplitShardCmd implements Cmd {
collectionName,
new ZkNodeProps(collection.getProperties()),
subSlices, repFactor - 1, 0, 0);
+ sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
@@ -513,7 +516,7 @@ public class SplitShardCmd implements Cmd {
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
} finally {
- PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
+ if (sessionWrapper != null) sessionWrapper.release();
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
index 54044eb..6a55cfd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
@@ -84,7 +84,8 @@ public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
executeAll(requests);
- Policy.Session session = autoScalingConfig.getPolicy().createSession(ocmh.overseer.getSolrCloudManager());
+ PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(ocmh.overseer.getSolrCloudManager());
+ Policy.Session session = sessionWrapper.get();
for (; ; ) {
Suggester suggester = session.getSuggester(MOVEREPLICA)
.hint(Suggester.Hint.TARGET_NODE, nodeName);
@@ -96,9 +97,12 @@ public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
REPLICA_PROP, request.getParams().get(REPLICA_PROP),
ASYNC, request.getParams().get(ASYNC)));
}
-
-
- executeAll(requests);
+ sessionWrapper.returnSession(session);
+ try {
+ executeAll(requests);
+ } finally {
+ sessionWrapper.release();
+ }
}
private void executeAll(List<ZkNodeProps> requests) throws Exception {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 34c0705..ccffea7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -28,11 +28,12 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,28 +57,42 @@ public class ComputePlanAction extends TriggerActionBase {
if (autoScalingConf.isEmpty()) {
throw new Exception("Action: " + getName() + " executed but no policy is configured");
}
- Policy policy = autoScalingConf.getPolicy();
- Policy.Session session = policy.createSession(cloudManager);
- Suggester suggester = getSuggester(session, event, cloudManager);
- while (true) {
- SolrRequest operation = suggester.getSuggestion();
- if (operation == null) break;
- log.info("Computed Plan: {}", operation.getParams());
- Map<String, Object> props = context.getProperties();
- props.compute("operations", (k, v) -> {
- List<SolrRequest> operations = (List<SolrRequest>) v;
- if (operations == null) operations = new ArrayList<>();
- operations.add(operation);
- return operations;
- });
- session = suggester.getSession();
- suggester = getSuggester(session, event, cloudManager);
+
+ // Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
+// return new PolicyHelper.SessionWrapper(session, null);
+ PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(cloudManager);
+ Policy.Session session = sessionWrapper.get();
+// Policy policy = autoScalingConf.getPolicy();
+ try {
+ Suggester suggester = getSuggester(session, event, cloudManager);
+ while (true) {
+ SolrRequest operation = suggester.getSuggestion();
+ if (operation == null) break;
+ log.info("Computed Plan: {}", operation.getParams());
+ Map<String, Object> props = context.getProperties();
+ props.compute("operations", (k, v) -> {
+ List<SolrRequest> operations = (List<SolrRequest>) v;
+ if (operations == null) operations = new ArrayList<>();
+ operations.add(operation);
+ return operations;
+ });
+ session = suggester.getSession();
+ suggester = getSuggester(session, event, cloudManager);
+ }
+ } finally {
+ releasePolicySession(sessionWrapper, session);
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception while processing event: " + event, e); }
}
+ private void releasePolicySession(PolicyHelper.SessionWrapper sessionWrapper, Policy.Session session) {
+ sessionWrapper.returnSession(session);
+ sessionWrapper.release();
+
+ }
+
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
Suggester suggester;
switch (event.getEventType()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java
index 0480426..1122952 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
-@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;")
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper=DEBUG")
public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
index 0987db4..09b6193 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
@@ -30,6 +30,7 @@ import org.apache.solr.common.util.ObjectCache;
*/
public class DelegatingCloudManager implements SolrCloudManager {
private final SolrCloudManager delegate;
+ private ObjectCache objectCache = new ObjectCache();
public DelegatingCloudManager(SolrCloudManager delegate) {
this.delegate = delegate;
@@ -57,7 +58,7 @@ public class DelegatingCloudManager implements SolrCloudManager {
@Override
public ObjectCache getObjectCache() {
- return delegate.getObjectCache();
+ return delegate == null ? objectCache : delegate.getObjectCache();
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
new file mode 100644
index 0000000..b47d1c8
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher;
+
+public class DelegatingDistribStateManager implements DistribStateManager {
+ private final DistribStateManager delegate;
+
+ public DelegatingDistribStateManager(DistribStateManager delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean hasData(String path) throws IOException, KeeperException, InterruptedException {
+ return delegate.hasData(path);
+ }
+
+ @Override
+ public List<String> listData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+ return delegate.listData(path);
+ }
+
+ @Override
+ public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+ return delegate.getData(path, watcher);
+ }
+
+ @Override
+ public VersionedData getData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+ return delegate.getData(path);
+ }
+
+ @Override
+ public void makePath(String path) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+ delegate.makePath(path);
+ }
+
+ @Override
+ public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+ return delegate.createData(path, data, mode);
+ }
+
+ @Override
+ public void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+ delegate.removeData(path, version);
+ }
+
+ @Override
+ public void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException {
+ delegate.setData(path, data, version);
+ }
+
+ @Override
+ public List<OpResult> multi(Iterable<Op> ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException {
+ return delegate.multi(ops);
+ }
+
+ @Override
+ public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws InterruptedException, IOException {
+ return delegate.getAutoScalingConfig(watcher);
+ }
+
+ @Override
+ public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
+ return delegate.getAutoScalingConfig();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index 5d179b7..024c6c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -18,13 +18,14 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
@@ -36,12 +37,19 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
+import static org.apache.solr.common.util.Utils.time;
+import static org.apache.solr.common.util.Utils.timeElapsed;
public class PolicyHelper {
private static ThreadLocal<Map<String, String>> policyMapping = new ThreadLocal<>();
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
SolrCloudManager cloudManager,
Map<String, String> optionalPolicyMapping,
@@ -52,26 +60,45 @@ public class PolicyHelper {
List<String> nodesList) {
List<ReplicaPosition> positions = new ArrayList<>();
ClusterStateProvider stateProvider = new DelegatingClusterStateProvider(cloudManager.getClusterStateProvider()) {
- @Override
- public String getPolicyNameByCollection(String coll) {
- return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?
- optionalPolicyMapping.get(coll) :
- delegate.getPolicyNameByCollection(coll);
- }
- };
+ @Override
+ public String getPolicyNameByCollection(String coll) {
+ return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?
+ optionalPolicyMapping.get(coll) :
+ delegate.getPolicyNameByCollection(coll);
+ }
+ };
SolrCloudManager delegatingManager = new DelegatingCloudManager(cloudManager) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return stateProvider;
}
+
+ @Override
+ public DistribStateManager getDistribStateManager() {
+ if (autoScalingConfig != null) {
+ return new DelegatingDistribStateManager(null) {
+ @Override
+ public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
+ return autoScalingConfig;
+ }
+ };
+ } else {
+ return super.getDistribStateManager();
+ }
+ }
};
policyMapping.set(optionalPolicyMapping);
+ SessionWrapper sessionWrapper = null;
Policy.Session session = null;
try {
- session = SESSION_REF.get() != null ?
- SESSION_REF.get().initOrGet(delegatingManager, autoScalingConfig.getPolicy()) :
- autoScalingConfig.getPolicy().createSession(delegatingManager);
+ try {
+ SESSION_WRAPPPER_REF.set(sessionWrapper = getSession(delegatingManager));
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to get autoscaling policy session", e);
+
+ }
+ session = sessionWrapper.session;
Map<Replica.Type, Integer> typeVsCount = new EnumMap<>(Replica.Type.class);
typeVsCount.put(Replica.Type.NRT, nrtReplicas);
@@ -100,15 +127,16 @@ public class PolicyHelper {
}
}
} finally {
- if (session != null && SESSION_REF.get() != null) SESSION_REF.get().updateSession(session);
policyMapping.remove();
+ if (sessionWrapper != null) {
+ sessionWrapper.returnSession(session);
+ }
}
return positions;
}
public static final int SESSION_EXPIRY = 180;//3 seconds
- public static ThreadLocal<Long> REF_VERSION = new ThreadLocal<>();
public static MapWriter getDiagnostics(Policy policy, SolrClientCloudManager cloudManager) {
Policy.Session session = policy.createSession(cloudManager);
@@ -155,80 +183,220 @@ public class PolicyHelper {
return suggestionCtx.getSuggestions();
}
- public static class SessionRef {
- private final AtomicLong myVersion = new AtomicLong(0);
- AtomicInteger refCount = new AtomicInteger();
- private Policy.Session session;
- long lastUsedTime;
+ public enum Status {
+ NULL,
+ //it is just created and not yet used or all operations on it has been competed fully
+ UNUSED,
+ COMPUTING, EXECUTING;
+ }
+
+ /**
+ * This class stores a session for sharing purpose. If a process creates a session to
+ * compute operations,
+ * 1) see if there is a session that is available in the cache,
+ * 2) if yes, check if it is expired
+ * 3) if it is expired, create a new session
+ * 4) if it is not expired, borrow it
+ * 5) after computing operations put it back in the cache
+ */
+ static class SessionRef {
+ private final Object lockObj = new Object();
+ private SessionWrapper sessionWrapper = SessionWrapper.DEF_INST;
+
public SessionRef() {
}
- public long getRefVersion(){
- return myVersion.get();
+
+ //only for debugging
+ SessionWrapper getSessionWrapper() {
+ return sessionWrapper;
}
+ /**
+ * All operations suggested by the current session object
+ * is complete. Do not even cache anything
+ *
+ */
+ private void release(SessionWrapper sessionWrapper) {
+ synchronized (lockObj) {
+ if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) {
+ log.debug("session set to NULL");
+ this.sessionWrapper = SessionWrapper.DEF_INST;
+ } // else somebody created a new session b/c of expiry . So no need to do anything about it
+ }
+ }
- public void decref(long version) {
- synchronized (SessionRef.class) {
- if (session == null) return;
- if(myVersion.get() != version) return;
- if (refCount.decrementAndGet() <= 0) {
- session = null;
- lastUsedTime = 0;
+ /**
+ * Computing is over for this session and it may contain a new session with new state
+ * The session can be used by others while the caller is performing operations
+ *
+ */
+ private void returnSession(SessionWrapper sessionWrapper) {
+ synchronized (lockObj) {
+ sessionWrapper.status = Status.EXECUTING;
+ log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(MILLISECONDS),
+ sessionWrapper.createTime,
+ this.sessionWrapper.createTime);
+ if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
+ //this session was used for computing new operations and this can now be used for other
+ // computing
+ this.sessionWrapper = sessionWrapper;
+
+ //one thread who is waiting for this need to be notified.
+ lockObj.notify();
+ } else {
+ log.info("create time NOT SAME {} ", SessionWrapper.DEF_INST.createTime);
+ //else just ignore it
}
}
- }
- public int getRefCount() {
- return refCount.get();
}
- public Policy.Session get() {
- synchronized (SessionRef.class) {
- if (session == null) return null;
- if (TimeUnit.SECONDS.convert(System.nanoTime() - lastUsedTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
- session = null;
- return null;
+
+ public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException {
+ synchronized (lockObj) {
+ if (sessionWrapper.status == Status.NULL ||
+ TimeUnit.SECONDS.convert(System.nanoTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
+ //no session available or the session is expired
+ return createSession(cloudManager);
} else {
- REF_VERSION.set(myVersion.get());
- refCount.incrementAndGet();
- return session;
+ long waitStart = time(MILLISECONDS);
+ //the session is not expired
+ log.debug("reusing a session {}", this.sessionWrapper.createTime);
+ if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
+ this.sessionWrapper.status = Status.COMPUTING;
+ return sessionWrapper;
+ } else {
+ //status= COMPUTING it's being used for computing. computing is
+ log.debug("session being used. waiting... current time {} ", time(MILLISECONDS));
+ try {
+ lockObj.wait(10 * 1000);//wait for a max of 10 seconds
+ } catch (InterruptedException e) {
+ log.info("interrupted... ");
+ }
+ log.debug("out of waiting curr-time:{} time-elapsed {}", time(MILLISECONDS), timeElapsed(waitStart, MILLISECONDS));
+ // now this thread has woken up because it got timed out after 10 seconds or it is notified after
+ //the session was returned from another COMPUTING operation
+ if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
+ log.debug("Wait over. reusing the existing session ");
+ this.sessionWrapper.status = Status.COMPUTING;
+ return sessionWrapper;
+ } else {
+ //create a new Session
+ return createSession(cloudManager);
+ }
+ }
}
}
+
+
}
- public Policy.Session initOrGet(SolrCloudManager cloudManager, Policy policy) {
- synchronized (SessionRef.class) {
- Policy.Session session = get();
- if (session != null) return session;
- this.session = policy.createSession(cloudManager);
- myVersion.incrementAndGet();
- lastUsedTime = System.nanoTime();
- REF_VERSION.set(myVersion.get());
- refCount.set(1);
- return this.session;
+ private SessionWrapper createSession(SolrCloudManager cloudManager) throws InterruptedException, IOException {
+ synchronized (lockObj) {
+ log.debug("Creating a new session");
+ Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
+ log.debug("New session created ");
+ this.sessionWrapper = new SessionWrapper(session, this);
+ this.sessionWrapper.status = Status.COMPUTING;
+ return sessionWrapper;
}
}
- private void updateSession(Policy.Session session) {
- this.session = session;
- lastUsedTime = System.nanoTime();
- }
}
- public static void clearFlagAndDecref(SessionRef policySessionRef) {
- Long refVersion = REF_VERSION.get();
- if (refVersion != null) policySessionRef.decref(refVersion);
- REF_VERSION.remove();
+ /**
+ * How to get a shared Policy Session
+ * 1) call {@link #getSession(SolrCloudManager)}
+ * 2) compute all suggestions
+ * 3) call {@link SessionWrapper#returnSession(Policy.Session)}
+ * 4) perform all suggestions
+ * 5) call {@link SessionWrapper#release()}
+ */
+ public static SessionWrapper getSession(SolrCloudManager cloudManager) throws IOException, InterruptedException {
+ SessionRef sessionRef = (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
+ return sessionRef.get(cloudManager);
}
- public static PolicyHelper.SessionRef getPolicySessionRef(SolrCloudManager cloudManager){
- return (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
+
+ /**
+ * Use this to get the last used session wrapper in this thread
+ *
+ * @param clear whether to unset the threadlocal or not
+ */
+ public static SessionWrapper getLastSessionWrapper(boolean clear) {
+ SessionWrapper wrapper = SESSION_WRAPPPER_REF.get();
+ if (clear) SESSION_WRAPPPER_REF.remove();
+ return wrapper;
+
}
- public static ThreadLocal<SessionRef> SESSION_REF = new ThreadLocal<>();
+ static ThreadLocal<SessionWrapper> SESSION_WRAPPPER_REF = new ThreadLocal<>();
+
+
+ public static class SessionWrapper {
+ public static final SessionWrapper DEF_INST = new SessionWrapper(null, null);
+
+ static {
+ DEF_INST.status = Status.NULL;
+ DEF_INST.createTime = -1l;
+ DEF_INST.lastUpdateTime = -1l;
+ }
+
+ private long createTime;
+ private long lastUpdateTime;
+ private Policy.Session session;
+ public Status status;
+ private final SessionRef ref;
+ private AtomicInteger refCount = new AtomicInteger();
+
+ public long getCreateTime() {
+ return createTime;
+ }
+
+ public long getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ public SessionWrapper(Policy.Session session, SessionRef ref) {
+ lastUpdateTime = createTime = System.nanoTime();
+ this.session = session;
+ this.status = Status.UNUSED;
+ this.ref = ref;
+ }
+
+ public Policy.Session get() {
+ return session;
+ }
+
+ public SessionWrapper update(Policy.Session session) {
+ this.lastUpdateTime = System.nanoTime();
+ this.session = session;
+ return this;
+ }
+
+ public int getRefCount() {
+ return refCount.get();
+ }
+
+ /**
+ * return this for later use and update the session with the latest state
+ * ensure that this is done after computing the suggestions
+ */
+ public void returnSession(Policy.Session session) {
+ this.update(session);
+ refCount.incrementAndGet();
+ ref.returnSession(this);
+ }
+
+ //all ops are executed now it can be destroyed
+ public void release() {
+ refCount.decrementAndGet();
+ ref.release(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index 8dd431b..93af8c3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableSet;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class Utils {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -442,4 +444,13 @@ public class Utils {
}
}
}
+
+ public static long time(TimeUnit unit) {
+ return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ }
+
+ public static long timeElapsed(long start, TimeUnit unit) {
+ return unit.convert(System.nanoTime() - NANOSECONDS.convert(start, unit), NANOSECONDS);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index 31e316e..396234b 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.ImmutableList;
import org.apache.solr.SolrTestCaseJ4;
@@ -846,15 +847,15 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
@Test
- public void testSessionCaching() {
- PolicyHelper.SessionRef ref1 = new PolicyHelper.SessionRef();
+ public void testSessionCaching() throws IOException, InterruptedException {
+// PolicyHelper.SessionRef ref1 = new PolicyHelper.SessionRef();
String autoScalingjson = " '{cluster-policy':[" +
" { 'cores':'<10', 'node':'#ANY'}," +
" { 'replica':'<2', 'shard':'#EACH', 'node':'#ANY'}," +
" { 'nodeRole':'overseer','replica':0}]," +
" 'cluster-preferences':[{'minimize':'cores'}]}";
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
- PolicyHelper.SESSION_REF.set(ref1);
+// PolicyHelper.SESSION_REF.set(ref1);
String nodeValues = " {" +
" 'node4':{" +
" 'node':'10.0.0.4:8987_solr'," +
@@ -870,7 +871,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
" 'freedisk':884.7097854614258}," +
"}";
- SolrCloudManager provider = getSolrCloudManager((Map<String, Map>) Utils.fromJSONString(nodeValues), clusterState);
+
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
@@ -882,44 +883,70 @@ public class TestPolicy extends SolrTestCaseJ4 {
" ]" +
"}");
AutoScalingConfig config = new AutoScalingConfig(policies);
+ final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager((Map<String, Map>) Utils.fromJSONString(nodeValues),
+ clusterState)) {
+ @Override
+ public DistribStateManager getDistribStateManager() {
+ return delegatingDistribStateManager(config);
+ }
+ };
- List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations("c", config, provider, null,
+ List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations("c", config, solrCloudManager, null,
Arrays.asList("s1", "s2"), 1, 0, 0,
null);
- long sessionRefVersion = PolicyHelper.REF_VERSION.get();
- PolicyHelper.SessionRef ref1Copy = PolicyHelper.SESSION_REF.get();
- PolicyHelper.SESSION_REF.remove();
- Policy.Session session = ref1Copy.get();
+ PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
+ assertNotNull(sessionRef);
+ PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
+
+
+ Policy.Session session = sessionWrapper.get();
assertNotNull(session);
- assertEquals(ref1, ref1Copy);
- assertTrue(session.getPolicy() == config.getPolicy());
- ref1Copy.decref(sessionRefVersion);
- PolicyHelper.SESSION_REF.set(ref1);
- AutoScalingConfig config2 = new AutoScalingConfig(policies);
- locations = PolicyHelper.getReplicaLocations("c2", config2, provider, null, Arrays.asList("s1", "s2"), 1, 0, 0,
- null);
- sessionRefVersion = PolicyHelper.REF_VERSION.get();
- ref1Copy = PolicyHelper.SESSION_REF.get();
- PolicyHelper.SESSION_REF.remove();
- session = ref1Copy.get();
- ref1Copy.decref(sessionRefVersion);
- assertEquals(ref1, ref1Copy);
- assertFalse(session.getPolicy() == config2.getPolicy());
assertTrue(session.getPolicy() == config.getPolicy());
- assertEquals(2, ref1Copy.getRefCount());
- ref1.decref(sessionRefVersion);//decref 1
- ref1.decref(sessionRefVersion);//decref 2
- PolicyHelper.SESSION_REF.set(ref1);
- locations = PolicyHelper.getReplicaLocations("c3", config2, provider, null, Arrays.asList("s1", "s2"), 1, 0, 0,
- null);
- sessionRefVersion = PolicyHelper.REF_VERSION.get();
- ref1Copy = PolicyHelper.SESSION_REF.get();
- PolicyHelper.SESSION_REF.remove();
- session = ref1Copy.get();
- ref1Copy.decref(sessionRefVersion);
- assertTrue(session.getPolicy() == config2.getPolicy());
+ assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
+ sessionWrapper.release();
+ assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+ PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
+ assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime());
+ PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
+ AtomicLong secondTime = new AtomicLong();
+ Thread thread = new Thread(() -> {
+ try {
+ s2[0] = PolicyHelper.getSession(solrCloudManager);
+ secondTime.set(System.nanoTime());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ thread.start();
+ Thread.sleep(50);
+ long beforeReturn = System.nanoTime();
+ assertEquals(s1.getCreateTime(), sessionRef.getSessionWrapper().getCreateTime());
+ s1.returnSession(s1.get());
+ assertEquals(1, s1.getRefCount());
+ thread.join();
+ assertNotNull(s2[0]);
+ assertTrue(secondTime.get() > beforeReturn);
+ assertTrue(s1.getCreateTime() == s2[0].getCreateTime());
+
+ s2[0].returnSession(s2[0].get());
+ assertEquals(2, s1.getRefCount());
+
+ s2[0].release();
+ assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+ s1.release();
+ assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+
+ }
+
+ private DistribStateManager delegatingDistribStateManager(AutoScalingConfig config) {
+ return new DelegatingDistribStateManager(null) {
+ @Override
+ public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
+ return config;
+ }
+ };
}
public void testNegativeConditions() {