You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/02 14:24:46 UTC
svn commit: r1547015 -
/manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java
Author: kwright
Date: Mon Dec 2 13:24:45 2013
New Revision: 1547015
URL: http://svn.apache.org/r1547015
Log:
Refactor for testing so there's an AgentsDaemon class that can be instantiated multiple times to generate multiple agents processes in a single JVM.
Added:
manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java (with props)
Added: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java?rev=1547015&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java (added)
+++ manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java Mon Dec 2 13:24:45 2013
@@ -0,0 +1,378 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import java.io.*;
+import java.util.*;
+
+public class AgentsDaemon
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Agent shutdown signal name */
+ public static final String agentShutdownSignal = "_AGENTRUN_";
+ /** Agent service name prefix (followed by agent class name) */
+ public static final String agentServicePrefix = "AGENT_";
+
+ /** The agents thread, which starts and stops agents daemons to keep them consistent with the database, and
+ * also takes on process cleanup where necessary. */
+ protected AgentsThread agentsThread = null;
+
+ /** Process ID for this agents daemon. */
+ protected final String processID;
+
+ /** This is the place we keep track of the agents we've started. */
+ protected final Map<String,IAgent> runningHash = new HashMap<String,IAgent>();
+
+ // There are a number of different ways of running the agents framework.
+ // (1) Repeatedly call checkAgents(), and when all done make sure to call stopAgents().
+ // (2) Call registerAgentsShutdownHook(), then repeatedly run checkAgents(), Agent shutdown happens on JVM exit.
+ // (3) Call runAgents(), which will wait for someone else to call assertAgentsShutdownSignal(). Before exit, stopAgents() must be called.
+ // (4) Call registerAgentsShutdownHook(), then call runAgents(), which will wait for someone else to call assertAgentsShutdownSignal(). Shutdown happens on JVM exit.
+
+ /** Create an agents daemon object.
+ *@param processID is the process ID of this agents daemon. Process ID's must be unique
+ * for all agents daemons.
+ */
+ public AgentsDaemon(String processID)
+ {
+ this.processID = processID;
+ }
+
+ /** Assert shutdown signal for the current agents daemon.
+ */
+ public static void assertAgentsShutdownSignal(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.setGlobalFlag(agentShutdownSignal);
+ }
+
+ /** Clear shutdown signal for the current agents daemon.
+ */
+ public static void clearAgentsShutdownSignal(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.clearGlobalFlag(agentShutdownSignal);
+ }
+
+
+ /** Register agents shutdown hook.
+ * Call this ONCE before calling startAgents or checkAgents the first time, if you want automatic cleanup of agents on JVM stop.
+ */
+ public void registerAgentsShutdownHook(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // Create the shutdown hook for agents. All activity will be keyed off of runningHash, so it is safe to do this under all conditions.
+ org.apache.manifoldcf.core.system.ManifoldCF.addShutdownHook(new AgentsShutdownHook());
+ }
+
+ /** Run agents process.
+ * This method will not return until a shutdown signal is sent.
+ */
+ public void runAgents(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+
+ // Don't come up at all if shutdown signal in force
+ if (lockManager.checkGlobalFlag(agentShutdownSignal))
+ return;
+
+ // Create and start agents thread.
+ startAgents(threadContext);
+
+ while (true)
+ {
+ // Any shutdown signal yet?
+ if (lockManager.checkGlobalFlag(agentShutdownSignal))
+ break;
+
+ try
+ {
+ ManifoldCF.sleep(5000L);
+ }
+ catch (InterruptedException e)
+ {
+ break;
+ }
+ }
+
+ }
+
+ /** Start agents thread for this agents daemon object.
+ */
+ public void startAgents(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // Create and start agents thread.
+ agentsThread = new AgentsThread();
+ agentsThread.start();
+ }
+
+ /** Stop all started agents running under this agents daemon.
+ */
+ public void stopAgents(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ // Shut down agents background thread.
+ while (agentsThread != null)
+ {
+ agentsThread.interrupt();
+ if (!agentsThread.isAlive())
+ agentsThread = null;
+ }
+
+ // Shut down running agents services directly.
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ synchronized (runningHash)
+ {
+ // This is supposedly safe; iterator remove is used
+ Iterator<String> iter = runningHash.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String className = iter.next();
+ IAgent agent = runningHash.get(className);
+ // Stop it
+ agent.stopAgent(threadContext);
+ lockManager.endServiceActivity(getAgentsClassServiceType(className), processID);
+ iter.remove();
+ agent.cleanUp(threadContext);
+ }
+ }
+ // Done.
+ }
+
+ protected static String getAgentsClassServiceType(String agentClassName)
+ {
+ return agentServicePrefix + agentClassName;
+ }
+
+ /** Agents thread. This runs in background until interrupted, at which point
+ * it shuts down. Its responsibilities include cleaning up after dead processes,
+ * as well as starting newly-registered agent processes, and terminating ones that disappear.
+ */
+ protected class AgentsThread extends Thread
+ {
+ public AgentsThread()
+ {
+ super();
+ setName("Agents thread");
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ IThreadContext threadContext = ThreadContextFactory.make();
+ while (true)
+ {
+ try
+ {
+ if (Thread.currentThread().isInterrupted())
+ throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+ checkAgents(threadContext);
+ ManifoldCF.sleep(5000L);
+ }
+ catch (InterruptedException e)
+ {
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ break;
+ Logging.agents.error("Exception tossed: "+e.getMessage(),e);
+ }
+ catch (OutOfMemoryError e)
+ {
+ System.err.println("Agents process ran out of memory - shutting down");
+ e.printStackTrace(System.err);
+ System.exit(-200);
+ }
+ catch (Throwable e)
+ {
+ Logging.agents.fatal("Error tossed: "+e.getMessage(),e);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // Severe error on initialization
+ System.err.println("Agents process could not start - shutting down");
+ Logging.agents.fatal("AgentThread initialization error tossed: "+e.getMessage(),e);
+ System.exit(-300);
+ }
+ }
+ }
+
+ /** Start all not-running agents.
+ *@param threadContext is the thread context.
+ */
+ protected void checkAgents(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ // Get agent manager
+ IAgentManager manager = AgentManagerFactory.make(threadContext);
+ synchronized (runningHash)
+ {
+ String[] classes = manager.getAllAgents();
+ Set<String> currentAgentClasses = new HashSet<String>();
+
+ int i = 0;
+ while (i < classes.length)
+ {
+ String className = classes[i++];
+ if (runningHash.get(className) == null)
+ {
+ // Start this agent
+ IAgent agent = AgentFactory.make(className);
+ agent.initialize(threadContext);
+ try
+ {
+ // Throw a lock, so that cleanup processes and startup processes don't collide.
+ String serviceType = getAgentsClassServiceType(className);
+ lockManager.registerServiceBeginServiceActivity(serviceType, processID, new CleanupAgent(threadContext, agent, processID));
+ // There is a potential race condition where the agent has been started but hasn't yet appeared in runningHash.
+ // But having runningHash be the synchronizer for this activity will prevent any problems.
+ agent.startAgent(threadContext, processID);
+ // Successful!
+ runningHash.put(className,agent);
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
+ agent.cleanUp(threadContext);
+ throw e;
+ }
+ }
+ currentAgentClasses.add(className);
+ }
+
+ // Go through running hash and look for agents processes that have left
+ Iterator<String> runningAgentsIterator = runningHash.keySet().iterator();
+ while (runningAgentsIterator.hasNext())
+ {
+ String runningAgentClass = runningAgentsIterator.next();
+ if (!currentAgentClasses.contains(runningAgentClass))
+ {
+ // Shut down this one agent.
+ IAgent agent = runningHash.get(runningAgentClass);
+ // Stop it
+ agent.stopAgent(threadContext);
+ lockManager.endServiceActivity(getAgentsClassServiceType(runningAgentClass), processID);
+ runningAgentsIterator.remove();
+ agent.cleanUp(threadContext);
+ }
+ }
+ }
+
+ synchronized (runningHash)
+ {
+ // For every class we're supposed to be running, find registered but no-longer-active instances and clean
+ // up after them.
+ for (String agentsClass : runningHash.keySet())
+ {
+ IAgent agent = runningHash.get(agentsClass);
+ IServiceCleanup cleanup = new CleanupAgent(threadContext, agent, processID);
+ String agentsClassServiceType = getAgentsClassServiceType(agentsClass);
+ while (!lockManager.cleanupInactiveService(agentsClassServiceType, cleanup))
+ {
+ // Loop until no more inactive services
+ }
+ }
+ }
+
+ }
+
+ /** Agent cleanup class. This provides functionality to clean up after agents processes
+ * that have gone away, or initialize an entire cluster.
+ */
+ protected static class CleanupAgent implements IServiceCleanup
+ {
+ protected final IAgent agent;
+ protected final IThreadContext threadContext;
+ protected final String processID;
+
+ public CleanupAgent(IThreadContext threadContext, IAgent agent, String processID)
+ {
+ this.agent = agent;
+ this.threadContext = threadContext;
+ this.processID = processID;
+ }
+
+ /** Clean up after the specified service. This method will block any startup of the specified
+ * service for as long as it runs.
+ *@param serviceName is the name of the service.
+ */
+ @Override
+ public void cleanUpService(String serviceName)
+ throws ManifoldCFException
+ {
+ agent.cleanUpAgentData(threadContext, processID, serviceName);
+ }
+
+ /** Clean up after ALL services of the type on the cluster.
+ */
+ @Override
+ public void cleanUpAllServices()
+ throws ManifoldCFException
+ {
+ agent.cleanUpAllAgentData(threadContext, processID);
+ }
+
+ /** Perform cluster initialization - that is, whatever is needed presuming that the
+ * cluster has been down for an indeterminate period of time, but is otherwise in a clean
+ * state.
+ */
+ @Override
+ public void clusterInit()
+ throws ManifoldCFException
+ {
+ agent.clusterInit(threadContext);
+ }
+
+ }
+
+ /** Agents shutdown hook class */
+ protected class AgentsShutdownHook implements IShutdownHook
+ {
+
+ public AgentsShutdownHook()
+ {
+ }
+
+ public void doCleanup()
+ throws ManifoldCFException
+ {
+ // Shutting down in this way must prevent startup from taking place.
+ IThreadContext tc = ThreadContextFactory.make();
+ stopAgents(tc);
+ }
+
+ }
+
+}
+
Propchange: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java
------------------------------------------------------------------------------
svn:keywords = Id