You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/05 19:13:28 UTC
svn commit: r1548227 - in /manifoldcf/branches/CONNECTORS-781/framework:
agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/
core/src/main/java/org/apache/manifoldcf/core/interfaces/
core/src/main/java/org/apache/manifoldcf/core/lock...
Author: kwright
Date: Thu Dec 5 18:13:27 2013
New Revision: 1548227
URL: http://svn.apache.org/r1548227
Log:
Add anonymous services
Modified:
manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
Modified: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java?rev=1548227&r1=1548226&r2=1548227&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java Thu Dec 5 18:13:27 2013
@@ -35,8 +35,32 @@ public class OutputConnectorPool impleme
/** Local connector pool */
protected final static LocalPool localPool = new LocalPool();
- // This implementation is a place-holder for the real one, which will likely fold in the pooling code
- // as we strip it out of OutputConnectorFactory.
+ // How global connector allocation works:
+ // (1) There is a lock-manager "service" associated with this connector pool. This allows us to clean
+ // up after local pools that have died without being released. There's one service instance per local pool,
+ // and thus one service instance per JVM. NOTE WELL: This means that we need distinct process id's
+ // for non-agent services as well!! How do we do that??
+ // A: -D always overrides processID, but in addition there's a default describing the kind of process it is,
+ // e.g. "" for agents, "AUTH" for authority service, "API" for api service, "UI" for crawler UI, "CL" for
+ // command line.
+ // Next question; how does the process ID get set for these?
+ // A: Good question...
+ // Alternative: SINCE the pools are in-memory-only and static, maybe we can just mint a unique ID for
+ // each pool on each instantiation.
+ // Problem: Guaranteeing uniqueness
+ // Solution: Zookeeper sequential nodes will do that for us, but is there a local/file equivalent? Must be, but lock manager
+ // needs extension, clearly. But given that, it should be possible to have a globally-unique, transient "pool ID".
+ // The transient globally-unique ID seems like the best solution.
+ // (2) Each local pool knows how many connector instances of each type (keyed by class name and config info) there
+ // are.
+ // (3) Each local pool/connector instance type has a local authorization count. This is the amount it's
+ // allowed to actually keep. If the pool has more connectors of a type than the local authorization count permits,
+ // then every connector release operation will destroy the released connector until the local authorization count
+ // is met.
+ // (4) Each local pool/connector instance type needs a global variable describing how many CURRENT instances
+ // the local pool has allocated. This is a transient value which should automatically go to zero if the service becomes inactive.
+ // The lock manager will need to be extended in order to provide this functionality - essentially, transient service-associated data.
+ // There also needs to be a fast way to sum the data for all active services (but that does not need to be a lock-manager primitive)
/** Thread context */
protected final IThreadContext threadContext;
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java?rev=1548227&r1=1548226&r2=1548227&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java Thu Dec 5 18:13:27 2013
@@ -44,10 +44,13 @@ public interface ILockManager
* If registration will succeed, then this method may call an appropriate IServiceCleanup method to clean up either the
* current service, or all services on the cluster.
*@param serviceType is the type of service.
- *@param serviceName is the name of the service to register.
- *@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists
+ *@param serviceName is the name of the service to register. If null is passed, a transient unique service name will be
+ * created, and will be returned to the caller.
+ *@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists.
+ * May be null. Local service cleanup is never called if the serviceName argument is null.
+ *@return the actual service name.
*/
- public void registerServiceBeginServiceActivity(String serviceType, String serviceName, IServiceCleanup cleanup)
+ public String registerServiceBeginServiceActivity(String serviceType, String serviceName, IServiceCleanup cleanup)
throws ManifoldCFException;
/** Count all active services of a given type.
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java?rev=1548227&r1=1548226&r2=1548227&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java Thu Dec 5 18:13:27 2013
@@ -87,7 +87,11 @@ public class BaseLockManager implements
protected final static String servicePrefix = "_SERVICE_";
/** A flag prefix, followed by the service type, and then followed by "_" and the service name */
protected final static String activePrefix = "_ACTIVE_";
-
+ /** Anonymous service name prefix, to be followed by an integer */
+ protected final static String anonymousServiceNamePrefix = "_ANON_";
+ /** Anonymous global variable name prefix, to be followed by the service type */
+ protected final static String anonymousServiceTypeCounter = "_SERVICECOUNTER_";
+
/** Register a service and begin service activity.
* This atomic operation creates a permanent registration entry for a service.
* If the permanent registration entry already exists, this method will not create it or
@@ -99,16 +103,22 @@ public class BaseLockManager implements
* If registration will succeed, then this method may call an appropriate IServiceCleanup method to clean up either the
* current service, or all services on the cluster.
*@param serviceType is the type of service.
- *@param serviceName is the name of the service to register.
- *@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists
+ *@param serviceName is the name of the service to register. If null is passed, a transient unique service name will be
+ * created, and will be returned to the caller.
+ *@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists.
+ * May be null. Local service cleanup is never called if the serviceName argument is null.
+ *@return the actual service name.
*/
@Override
- public void registerServiceBeginServiceActivity(String serviceType, String serviceName, IServiceCleanup cleanup)
+ public String registerServiceBeginServiceActivity(String serviceType, String serviceName, IServiceCleanup cleanup)
throws ManifoldCFException
{
enterWriteLock(serviceLock);
try
{
+ if (serviceName == null)
+ serviceName = constructUniqueServiceName(serviceType);
+
// First, do an active check
String serviceActiveFlag = makeActiveServiceFlagName(serviceType, serviceName);
if (checkGlobalFlag(serviceActiveFlag))
@@ -197,6 +207,7 @@ public class BaseLockManager implements
// Last, set the appropriate active flag
setGlobalFlag(serviceActiveFlag);
+ return serviceName;
}
finally
{
@@ -235,6 +246,51 @@ public class BaseLockManager implements
}
}
+ /** Construct a unique service name given the service type.
+ */
+ protected String constructUniqueServiceName(String serviceType)
+ throws ManifoldCFException
+ {
+ String serviceCounterName = makeServiceCounterName(serviceType);
+ int serviceUID = readServiceCounter(serviceCounterName);
+ writeServiceCounter(serviceCounterName,serviceUID+1);
+ return anonymousServiceNamePrefix + serviceUID;
+ }
+
+ /** Make the service counter name for a service type.
+ */
+ protected static String makeServiceCounterName(String serviceType)
+ {
+ return anonymousServiceTypeCounter + serviceType;
+ }
+
+ /** Read service counter.
+ */
+ protected int readServiceCounter(String serviceCounterName)
+ throws ManifoldCFException
+ {
+ byte[] serviceCounterData = readData(serviceCounterName);
+ if (serviceCounterData == null || serviceCounterData.length != 4)
+ return 0;
+ return ((int)serviceCounterData[0]) & 0xff +
+ (((int)serviceCounterData[1]) << 8) & 0xff00 +
+ (((int)serviceCounterData[2]) << 16) & 0xff0000 +
+ (((int)serviceCounterData[3]) << 24) & 0xff000000;
+ }
+
+ /** Write service counter.
+ */
+ protected void writeServiceCounter(String serviceCounterName, int counter)
+ throws ManifoldCFException
+ {
+ byte[] serviceCounterData = new byte[4];
+ serviceCounterData[0] = (byte)(counter & 0xff);
+ serviceCounterData[1] = (byte)((counter >> 8) & 0xff);
+ serviceCounterData[2] = (byte)((counter >> 16) & 0xff);
+ serviceCounterData[3] = (byte)((counter >> 24) & 0xff);
+ writeData(serviceCounterName,serviceCounterData);
+ }
+
/** Clean up any inactive services found.
* Calling this method will invoke cleanup of one inactive service at a time.
* If there are no inactive services around, then false will be returned.
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java?rev=1548227&r1=1548226&r2=1548227&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java Thu Dec 5 18:13:27 2013
@@ -55,14 +55,17 @@ public class LockManager implements ILoc
* If registration will succeed, then this method may call an appropriate IServiceCleanup method to clean up either the
* current service, or all services on the cluster.
*@param serviceType is the type of service.
- *@param serviceName is the name of the service to register.
- *@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists
+ *@param serviceName is the name of the service to register. If null is passed, a transient unique service name will be
+ * created, and will be returned to the caller.
+ *@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists.
+ * May be null. Local service cleanup is never called if the serviceName argument is null.
+ *@return the actual service name.
*/
@Override
- public void registerServiceBeginServiceActivity(String serviceType, String serviceName, IServiceCleanup cleanup)
+ public String registerServiceBeginServiceActivity(String serviceType, String serviceName, IServiceCleanup cleanup)
throws ManifoldCFException
{
- lockManager.registerServiceBeginServiceActivity(serviceType, serviceName, cleanup);
+ return lockManager.registerServiceBeginServiceActivity(serviceType, serviceName, cleanup);
}
/** Clean up any inactive services found.
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java?rev=1548227&r1=1548226&r2=1548227&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java Thu Dec 5 18:13:27 2013
@@ -44,6 +44,11 @@ public class ZooKeeperLockManager extend
private final static String SERVICETYPE_ACTIVE_PATH_PREFIX = "/org.apache.manifoldcf.serviceactive-";
private final static String SERVICETYPE_REGISTER_PATH_PREFIX = "/org.apache.manifoldcf.service-";
+ /** Anonymous service name prefix, to be followed by an integer */
+ protected final static String anonymousServiceNamePrefix = "_ANON_";
+ /** Anonymous global variable name prefix, to be followed by the service type */
+ protected final static String anonymousServiceTypeCounter = "_SERVICECOUNTER_";
+
// ZooKeeper connection pool
protected static Integer connectionPoolLock = new Integer(0);
protected static ZooKeeperConnectionPool pool = null;
@@ -100,11 +105,14 @@ public class ZooKeeperLockManager extend
* If registration will succeed, then this method may call an appropriate IServiceCleanup method to clean up either the
* current service, or all services on the cluster.
*@param serviceType is the type of service.
- *@param serviceName is the name of the service to register.
- *@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists
+ *@param serviceName is the name of the service to register. If null is passed, a transient unique service name will be
+ * created, and will be returned to the caller.
+ *@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists.
+ * May be null. Local service cleanup is never called if the serviceName argument is null.
+ *@return the actual service name.
*/
@Override
- public void registerServiceBeginServiceActivity(String serviceType, String serviceName, IServiceCleanup cleanup)
+ public String registerServiceBeginServiceActivity(String serviceType, String serviceName, IServiceCleanup cleanup)
throws ManifoldCFException
{
try
@@ -115,6 +123,9 @@ public class ZooKeeperLockManager extend
enterServiceRegistryLock(connection, serviceType);
try
{
+ if (serviceName == null)
+ serviceName = constructUniqueServiceName(serviceType);
+
String activePath = buildServiceTypeActivePath(serviceType, serviceName);
if (connection.checkNodeExists(activePath))
throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+"' is already active");
@@ -175,6 +186,7 @@ public class ZooKeeperLockManager extend
// Last, set the appropriate active flag
connection.createNode(activePath);
+ return serviceName;
}
finally
{
@@ -390,6 +402,51 @@ public class ZooKeeperLockManager extend
connection.releaseLock();
}
+ /** Construct a unique service name given the service type.
+ */
+ protected String constructUniqueServiceName(String serviceType)
+ throws ManifoldCFException
+ {
+ String serviceCounterName = makeServiceCounterName(serviceType);
+ int serviceUID = readServiceCounter(serviceCounterName);
+ writeServiceCounter(serviceCounterName,serviceUID+1);
+ return anonymousServiceNamePrefix + serviceUID;
+ }
+
+ /** Make the service counter name for a service type.
+ */
+ protected static String makeServiceCounterName(String serviceType)
+ {
+ return anonymousServiceTypeCounter + serviceType;
+ }
+
+ /** Read service counter.
+ */
+ protected int readServiceCounter(String serviceCounterName)
+ throws ManifoldCFException
+ {
+ byte[] serviceCounterData = readData(serviceCounterName);
+ if (serviceCounterData == null || serviceCounterData.length != 4)
+ return 0;
+ return ((int)serviceCounterData[0]) & 0xff +
+ (((int)serviceCounterData[1]) << 8) & 0xff00 +
+ (((int)serviceCounterData[2]) << 16) & 0xff0000 +
+ (((int)serviceCounterData[3]) << 24) & 0xff000000;
+ }
+
+ /** Write service counter.
+ */
+ protected void writeServiceCounter(String serviceCounterName, int counter)
+ throws ManifoldCFException
+ {
+ byte[] serviceCounterData = new byte[4];
+ serviceCounterData[0] = (byte)(counter & 0xff);
+ serviceCounterData[1] = (byte)((counter >> 8) & 0xff);
+ serviceCounterData[2] = (byte)((counter >> 16) & 0xff);
+ serviceCounterData[3] = (byte)((counter >> 24) & 0xff);
+ writeData(serviceCounterName,serviceCounterData);
+ }
+
/** Build a zk path for the lock for a specific service type.
*/
protected static String buildServiceTypeLockPath(String serviceType)