You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/09/23 08:55:16 UTC

svn commit: r1626954 - /qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java

Author: kwall
Date: Tue Sep 23 06:55:16 2014
New Revision: 1626954

URL: http://svn.apache.org/r1626954
Log:
QPID-6111: [Java Broker] HA - Ensure that when the REF is shutdown sufficient time is allowed to allow any in progress JE ReplicatedEnvironment to complete.

Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1626954&r1=1626953&r2=1626954&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Sep 23 06:55:16 2014
@@ -188,17 +188,17 @@ public class ReplicatedEnvironmentFacade
     private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
     private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
     private final Durability _defaultDurability;
+    private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
+    private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
+    private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
 
     private volatile Durability _realMessageStoreDurability = null;
+    private volatile Durability _messageStoreDurability;
     private volatile CoalescingCommiter _coalescingCommiter = null;
     private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
     private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
-    private volatile Durability _messageStoreDurability;
-
-    private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
-    private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
-    private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
+    private volatile long _envSetupTimeoutMillis;
 
     public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
     {
@@ -306,8 +306,11 @@ public class ReplicatedEnvironmentFacade
                     LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName + " current state is " + _state.get());
                 }
 
-                shutdownAndAwaitExecutorService(_environmentJobExecutor);
-                shutdownAndAwaitExecutorService(_groupChangeExecutor);
+                long timeout = Math.min(_executorShutdownTimeout, _envSetupTimeoutMillis);
+                shutdownAndAwaitExecutorService(_environmentJobExecutor,
+                                                timeout,
+                                                TimeUnit.MILLISECONDS);
+                shutdownAndAwaitExecutorService(_groupChangeExecutor, _executorShutdownTimeout, TimeUnit.MILLISECONDS);
 
                 try
                 {
@@ -347,17 +350,17 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private void shutdownAndAwaitExecutorService(ExecutorService executorService)
+    private void shutdownAndAwaitExecutorService(ExecutorService executorService, long executorShutdownTimeout, TimeUnit timeUnit)
     {
         executorService.shutdown();
         try
         {
-            boolean wasShutdown = executorService.awaitTermination(_executorShutdownTimeout, TimeUnit.MILLISECONDS);
+            boolean wasShutdown = executorService.awaitTermination(executorShutdownTimeout, timeUnit);
             if (!wasShutdown)
             {
                 LOGGER.warn("Executor service " + executorService +
-                            " did not shutdown within allowed time period " + _executorShutdownTimeout +
-                            ", ignoring");
+                            " did not shutdown within allowed time period " + _executorShutdownTimeout
+                            + " " + timeUnit + ", ignoring");
             }
         }
         catch (InterruptedException e)
@@ -434,7 +437,10 @@ public class ReplicatedEnvironmentFacade
     @Override
     public Database openDatabase(String name, DatabaseConfig databaseConfig)
     {
-        LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
+        }
         if (_state.get() != State.OPEN)
         {
             throw new IllegalStateException("Environment facade is not in opened state");
@@ -452,13 +458,19 @@ public class ReplicatedEnvironmentFacade
             Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
             if (existingHandle == null)
             {
-                LOGGER.debug("openDatabase " + name + " new handle");
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("openDatabase " + name + " new handle");
+                }
 
                 cachedHandle = handle;
             }
             else
             {
-                LOGGER.debug("openDatabase " + name + " existing handle");
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("openDatabase " + name + " existing handle");
+                }
                 cachedHandle = existingHandle;
                 handle.close();
             }
@@ -1091,18 +1103,10 @@ public class ReplicatedEnvironmentFacade
             @Override
             public ReplicatedEnvironment call() throws Exception
             {
-                String originalThreadName = Thread.currentThread().getName();
-                try
-                {
-                    return createEnvironment(environmentPathFile, envConfig, replicationConfig);
-                }
-                finally
-                {
-                    Thread.currentThread().setName(originalThreadName);
-                }
+                return createEnvironment(environmentPathFile, envConfig, replicationConfig);
             }});
 
-        long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+        long setUpTimeOutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
         try
         {
             return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS);
@@ -1118,7 +1122,8 @@ public class ReplicatedEnvironmentFacade
         }
         catch (TimeoutException e)
         {
-            throw new RuntimeException("JE environment has not been created in due time");
+            throw new RuntimeException("JE replicated environment creation took too long (permitted time "
+                                       + setUpTimeOutMillis + "ms)");
         }
     }
 
@@ -1126,19 +1131,28 @@ public class ReplicatedEnvironmentFacade
             final ReplicationConfig replicationConfig)
     {
         ReplicatedEnvironment environment = null;
+
+        String originalThreadName = Thread.currentThread().getName();
         try
         {
+            _envSetupTimeoutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
             environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
         }
         catch (final InsufficientLogException ile)
         {
-            LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+            LOGGER.warn("The log files of this node are too old. Network restore will begin now.", ile);
             NetworkRestore restore = new NetworkRestore();
             NetworkRestoreConfig config = new NetworkRestoreConfig();
             config.setRetainLogFiles(false);
             restore.execute(ile, config);
+            LOGGER.warn("Network restore complete.");
             environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
         }
+        finally
+        {
+            Thread.currentThread().setName(originalThreadName);
+        }
+
         if (LOGGER.isInfoEnabled())
         {
             LOGGER.info("Environment is created for node " + _prettyGroupNodeName);
@@ -1146,6 +1160,11 @@ public class ReplicatedEnvironmentFacade
         return environment;
     }
 
+    private long extractEnvSetupTimeoutMillis(ReplicationConfig replicationConfig)
+    {
+        return (long) PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+    }
+
     public int getNumberOfElectableGroupMembers()
     {
         if (_state.get() != State.OPEN)
@@ -1339,8 +1358,11 @@ public class ReplicatedEnvironmentFacade
         }
         else
         {
-            LOGGER.warn(String.format("Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s",
-                    replicationNode.getName(), getHostPort(replicationNode), String.valueOf(_permittedNodes)));
+            LOGGER.warn(String.format(
+                    "Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s",
+                    replicationNode.getName(),
+                    getHostPort(replicationNode),
+                    String.valueOf(_permittedNodes)));
             return true;
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org