You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/09/23 08:55:16 UTC
svn commit: r1626954 -
/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Author: kwall
Date: Tue Sep 23 06:55:16 2014
New Revision: 1626954
URL: http://svn.apache.org/r1626954
Log:
QPID-6111: [Java Broker] HA - Ensure that when the REF is shutdown sufficient time is allowed to allow any in progress JE ReplicatedEnvironment to complete.
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1626954&r1=1626953&r2=1626954&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Sep 23 06:55:16 2014
@@ -188,17 +188,17 @@ public class ReplicatedEnvironmentFacade
private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private final Durability _defaultDurability;
+ private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
+ private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
private volatile Durability _realMessageStoreDurability = null;
+ private volatile Durability _messageStoreDurability;
private volatile CoalescingCommiter _coalescingCommiter = null;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- private volatile Durability _messageStoreDurability;
-
- private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
- private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
- private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
+ private volatile long _envSetupTimeoutMillis;
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
{
@@ -306,8 +306,11 @@ public class ReplicatedEnvironmentFacade
LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName + " current state is " + _state.get());
}
- shutdownAndAwaitExecutorService(_environmentJobExecutor);
- shutdownAndAwaitExecutorService(_groupChangeExecutor);
+ long timeout = Math.min(_executorShutdownTimeout, _envSetupTimeoutMillis);
+ shutdownAndAwaitExecutorService(_environmentJobExecutor,
+ timeout,
+ TimeUnit.MILLISECONDS);
+ shutdownAndAwaitExecutorService(_groupChangeExecutor, _executorShutdownTimeout, TimeUnit.MILLISECONDS);
try
{
@@ -347,17 +350,17 @@ public class ReplicatedEnvironmentFacade
}
}
- private void shutdownAndAwaitExecutorService(ExecutorService executorService)
+ private void shutdownAndAwaitExecutorService(ExecutorService executorService, long executorShutdownTimeout, TimeUnit timeUnit)
{
executorService.shutdown();
try
{
- boolean wasShutdown = executorService.awaitTermination(_executorShutdownTimeout, TimeUnit.MILLISECONDS);
+ boolean wasShutdown = executorService.awaitTermination(executorShutdownTimeout, timeUnit);
if (!wasShutdown)
{
LOGGER.warn("Executor service " + executorService +
- " did not shutdown within allowed time period " + _executorShutdownTimeout +
- ", ignoring");
+ " did not shutdown within allowed time period " + _executorShutdownTimeout
+ + " " + timeUnit + ", ignoring");
}
}
catch (InterruptedException e)
@@ -434,7 +437,10 @@ public class ReplicatedEnvironmentFacade
@Override
public Database openDatabase(String name, DatabaseConfig databaseConfig)
{
- LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
+ }
if (_state.get() != State.OPEN)
{
throw new IllegalStateException("Environment facade is not in opened state");
@@ -452,13 +458,19 @@ public class ReplicatedEnvironmentFacade
Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
if (existingHandle == null)
{
- LOGGER.debug("openDatabase " + name + " new handle");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " new handle");
+ }
cachedHandle = handle;
}
else
{
- LOGGER.debug("openDatabase " + name + " existing handle");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " existing handle");
+ }
cachedHandle = existingHandle;
handle.close();
}
@@ -1091,18 +1103,10 @@ public class ReplicatedEnvironmentFacade
@Override
public ReplicatedEnvironment call() throws Exception
{
- String originalThreadName = Thread.currentThread().getName();
- try
- {
- return createEnvironment(environmentPathFile, envConfig, replicationConfig);
- }
- finally
- {
- Thread.currentThread().setName(originalThreadName);
- }
+ return createEnvironment(environmentPathFile, envConfig, replicationConfig);
}});
- long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ long setUpTimeOutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
try
{
return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS);
@@ -1118,7 +1122,8 @@ public class ReplicatedEnvironmentFacade
}
catch (TimeoutException e)
{
- throw new RuntimeException("JE environment has not been created in due time");
+ throw new RuntimeException("JE replicated environment creation took too long (permitted time "
+ + setUpTimeOutMillis + "ms)");
}
}
@@ -1126,19 +1131,28 @@ public class ReplicatedEnvironmentFacade
final ReplicationConfig replicationConfig)
{
ReplicatedEnvironment environment = null;
+
+ String originalThreadName = Thread.currentThread().getName();
try
{
+ _envSetupTimeoutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
}
catch (final InsufficientLogException ile)
{
- LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+ LOGGER.warn("The log files of this node are too old. Network restore will begin now.", ile);
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false);
restore.execute(ile, config);
+ LOGGER.warn("Network restore complete.");
environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
}
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Environment is created for node " + _prettyGroupNodeName);
@@ -1146,6 +1160,11 @@ public class ReplicatedEnvironmentFacade
return environment;
}
+ private long extractEnvSetupTimeoutMillis(ReplicationConfig replicationConfig)
+ {
+ return (long) PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ }
+
public int getNumberOfElectableGroupMembers()
{
if (_state.get() != State.OPEN)
@@ -1339,8 +1358,11 @@ public class ReplicatedEnvironmentFacade
}
else
{
- LOGGER.warn(String.format("Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s",
- replicationNode.getName(), getHostPort(replicationNode), String.valueOf(_permittedNodes)));
+ LOGGER.warn(String.format(
+ "Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s",
+ replicationNode.getName(),
+ getHostPort(replicationNode),
+ String.valueOf(_permittedNodes)));
return true;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org