You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/06 01:39:01 UTC
svn commit: r1548340 - in /manifoldcf/branches/CONNECTORS-781/framework:
agents/src/main/java/org/apache/manifoldcf/agents/
agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/
agents/src/main/java/org/apache/manifoldcf/agents/system/...
Author: kwright
Date: Fri Dec 6 00:39:00 2013
New Revision: 1548340
URL: http://svn.apache.org/r1548340
Log:
Add multiple-data-type transient service data
Modified:
manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/AgentRun.java
manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java
manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
Modified: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/AgentRun.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/AgentRun.java?rev=1548340&r1=1548339&r2=1548340&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/AgentRun.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/AgentRun.java Fri Dec 6 00:39:00 2013
@@ -51,7 +51,7 @@ public class AgentRun extends BaseAgents
// AgentStop only, and AgentStop will wait until all services become inactive before exiting.
String processID = ManifoldCF.getProcessID();
ILockManager lockManager = LockManagerFactory.make(tc);
- lockManager.registerServiceBeginServiceActivity(agentServiceType, processID, null, null);
+ lockManager.registerServiceBeginServiceActivity(agentServiceType, processID, null);
try
{
// Register a shutdown hook to make sure we signal that the main agents process is going inactive.
Modified: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java?rev=1548340&r1=1548339&r2=1548340&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnectorpool/OutputConnectorPool.java Fri Dec 6 00:39:00 2013
@@ -47,6 +47,11 @@ public class OutputConnectorPool impleme
// is met.
// (4) Each local pool/connector instance type needs a global variable describing how many CURRENT instances
// the local pool has allocated. This is a transient value which should automatically go to zero if the service becomes inactive.
+ // This is attached to the service as part of the transient service data for the service.
+ // Q: How do we separate instance counts based on pool classname/config??
+ // A: This seems challenging because (a) multiple transient variables seem to be needed, and (b) the variable names are VERY long
+ // (the entire length of the config!!) Alternatively we could use the name of the connection, but then when the connection info changes,
+ // we would need to address this somehow in the pool. How??
// The lock manager will need to be extended in order to provide this functionality - essentially, transient service-associated data.
// There also needs to be a fast way to sum the data for all active services (but that does not need to be a lock-manager primitive)
Modified: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java?rev=1548340&r1=1548339&r2=1548340&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/AgentsDaemon.java Fri Dec 6 00:39:00 2013
@@ -267,7 +267,7 @@ public class AgentsDaemon
{
// Throw a lock, so that cleanup processes and startup processes don't collide.
String serviceType = getAgentsClassServiceType(className);
- lockManager.registerServiceBeginServiceActivity(serviceType, processID, null, new CleanupAgent(threadContext, agent, processID));
+ lockManager.registerServiceBeginServiceActivity(serviceType, processID, new CleanupAgent(threadContext, agent, processID));
// There is a potential race condition where the agent has been started but hasn't yet appeared in runningHash.
// But having runningHash be the synchronizer for this activity will prevent any problems.
agent.startAgent(threadContext, processID);
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java?rev=1548340&r1=1548339&r2=1548340&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java Fri Dec 6 00:39:00 2013
@@ -46,30 +46,31 @@ public interface ILockManager
*@param serviceType is the type of service.
*@param serviceName is the name of the service to register. If null is passed, a transient unique service name will be
* created, and will be returned to the caller.
- *@param serviceData is the initial value of the service's transient data, or null if none.
*@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists.
* May be null. Local service cleanup is never called if the serviceName argument is null.
*@return the actual service name.
*/
- public String registerServiceBeginServiceActivity(String serviceType, String serviceName, byte[] serviceData,
+ public String registerServiceBeginServiceActivity(String serviceType, String serviceName,
IServiceCleanup cleanup)
throws ManifoldCFException;
- /** Update service data for a service.
+ /** Set service data for a service.
*@param serviceType is the type of service.
*@param serviceName is the name of the service.
+ *@param dataType is the type of data.
*@param serviceData is the data to update to (may be null).
* This updates the service's transient data (or deletes it). If the service is not active, an exception is thrown.
*/
- public void updateServiceData(String serviceType, String serviceName, byte[] serviceData)
+ public void updateServiceData(String serviceType, String serviceName, String dataType, byte[] serviceData)
throws ManifoldCFException;
/** Retrieve service data for a service.
*@param serviceType is the type of service.
*@param serviceName is the name of the service.
+ *@param dataType is the type of data.
*@return the service's transient data.
*/
- public byte[] retrieveServiceData(String serviceType, String serviceName)
+ public byte[] retrieveServiceData(String serviceType, String serviceName, String dataType)
throws ManifoldCFException;
/** Count all active services of a given type.
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java?rev=1548340&r1=1548339&r2=1548340&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java Fri Dec 6 00:39:00 2013
@@ -79,16 +79,18 @@ public class BaseLockManager implements
// By building on other primitives in this way, the same implementation will suffice for many derived
// lockmanager implementations - although ZooKeeper will want a native form.
- /** The global write lock to control sync */
- protected final static String serviceLock = "_SERVICELOCK_";
+ /** The service-type global write lock to control sync, followed by the service type */
+ protected final static String serviceTypeLockPrefix = "_SERVICELOCK_";
/** A data name prefix, followed by the service type, and then followed by "_" and the instance number */
protected final static String serviceListPrefix = "_SERVICELIST_";
/** A flag prefix, followed by the service type, and then followed by "_" and the service name */
protected final static String servicePrefix = "_SERVICE_";
/** A flag prefix, followed by the service type, and then followed by "_" and the service name */
protected final static String activePrefix = "_ACTIVE_";
- /** A data name prefix, followed by the service type, and then followed by "_" and the service name */
+ /** A data name prefix, followed by the service type, and then followed by "_" and the service name and "_" and the datatype */
protected final static String serviceDataPrefix = "_SERVICEDATA_";
+ /** A data name prefix, followed by the service type, and then followed by "_" and the service name, finally by "_" and the instance number */
+ protected final static String serviceDataListPrefix = "_SERVICEDATALIST_";
/** Anonymous service name prefix, to be followed by an integer */
protected final static String anonymousServiceNamePrefix = "_ANON_";
/** Anonymous global variable name prefix, to be followed by the service type */
@@ -107,17 +109,17 @@ public class BaseLockManager implements
*@param serviceType is the type of service.
*@param serviceName is the name of the service to register. If null is passed, a transient unique service name will be
* created, and will be returned to the caller.
- *@param serviceData is the initial value of the service's transient data, or null if none.
*@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists.
* May be null. Local service cleanup is never called if the serviceName argument is null.
*@return the actual service name.
*/
@Override
- public String registerServiceBeginServiceActivity(String serviceType, String serviceName, byte[] serviceData,
+ public String registerServiceBeginServiceActivity(String serviceType, String serviceName,
IServiceCleanup cleanup)
throws ManifoldCFException
{
- enterWriteLock(serviceLock);
+ String serviceTypeLockName = buildServiceTypeLockName(serviceType);
+ enterWriteLock(serviceTypeLockName);
try
{
if (serviceName == null)
@@ -211,62 +213,81 @@ public class BaseLockManager implements
// Last, set the appropriate active flag
setGlobalFlag(serviceActiveFlag);
- writeServiceData(serviceType, serviceName, serviceData);
return serviceName;
}
finally
{
- leaveWriteLock(serviceLock);
+ leaveWriteLock(serviceTypeLockName);
}
}
- /** Update service data for a service.
+ /** Set service data for a service.
*@param serviceType is the type of service.
*@param serviceName is the name of the service.
+ *@param dataType is the type of data.
*@param serviceData is the data to update to (may be null).
* This updates the service's transient data (or deletes it). If the service is not active, an exception is thrown.
*/
@Override
- public void updateServiceData(String serviceType, String serviceName, byte[] serviceData)
+ public void updateServiceData(String serviceType, String serviceName, String dataType, byte[] serviceData)
throws ManifoldCFException
{
- enterWriteLock(serviceLock);
+ String serviceTypeLockName = buildServiceTypeLockName(serviceType);
+ enterWriteLock(serviceTypeLockName);
try
{
String serviceActiveFlag = makeActiveServiceFlagName(serviceType, serviceName);
if (!checkGlobalFlag(serviceActiveFlag))
throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+"' is not active");
- writeServiceData(serviceType, serviceName, serviceData);
+ // This implementation is pretty lame - need to replace it with something which doesn't degrade with the
+ // number of data types being used, since they'll be a data type for each connection name.
+ // MHL
+ int i = 0;
+ while (true)
+ {
+ String dataTypeCandidate = readDataType(serviceType, serviceName, i);
+ if (dataTypeCandidate == null)
+ {
+ writeDataType(serviceType, serviceName, i, dataType);
+ break;
+ }
+ if (dataTypeCandidate.equals(dataType))
+ break;
+ i++;
+ }
+ writeServiceData(serviceType, serviceName, dataType, serviceData);
}
finally
{
- leaveWriteLock(serviceLock);
+ leaveWriteLock(serviceTypeLockName);
}
}
/** Retrieve service data for a service.
*@param serviceType is the type of service.
*@param serviceName is the name of the service.
+ *@param dataType is the type of data.
*@return the service's transient data.
*/
@Override
- public byte[] retrieveServiceData(String serviceType, String serviceName)
+ public byte[] retrieveServiceData(String serviceType, String serviceName, String dataType)
throws ManifoldCFException
{
- enterReadLock(serviceLock);
+ String serviceTypeLockName = buildServiceTypeLockName(serviceType);
+ enterReadLock(serviceTypeLockName);
try
{
String serviceActiveFlag = makeActiveServiceFlagName(serviceType, serviceName);
if (!checkGlobalFlag(serviceActiveFlag))
return null;
- byte[] rval = readServiceData(serviceType, serviceName);
+ byte[] rval = readServiceData(serviceType, serviceName, dataType);
if (rval == null)
rval = new byte[0];
return rval;
}
finally
{
- leaveReadLock(serviceLock);
+ leaveReadLock(serviceTypeLockName);
}
}
@@ -278,7 +299,8 @@ public class BaseLockManager implements
public int countActiveServices(String serviceType)
throws ManifoldCFException
{
- enterReadLock(serviceLock);
+ String serviceTypeLockName = buildServiceTypeLockName(serviceType);
+ enterReadLock(serviceTypeLockName);
try
{
int count = 0;
@@ -297,7 +319,7 @@ public class BaseLockManager implements
}
finally
{
- leaveReadLock(serviceLock);
+ leaveReadLock(serviceTypeLockName);
}
}
@@ -315,7 +337,8 @@ public class BaseLockManager implements
public boolean cleanupInactiveService(String serviceType, IServiceCleanup cleanup)
throws ManifoldCFException
{
- enterWriteLock(serviceLock);
+ String serviceTypeLockName = buildServiceTypeLockName(serviceType);
+ enterWriteLock(serviceTypeLockName);
try
{
// We find ONE service that is registered but inactive, and clean up after that one.
@@ -366,7 +389,7 @@ public class BaseLockManager implements
}
finally
{
- leaveWriteLock(serviceLock);
+ leaveWriteLock(serviceTypeLockName);
}
}
@@ -380,18 +403,19 @@ public class BaseLockManager implements
public void endServiceActivity(String serviceType, String serviceName)
throws ManifoldCFException
{
- enterWriteLock(serviceLock);
+ String serviceTypeLockName = buildServiceTypeLockName(serviceType);
+ enterWriteLock(serviceTypeLockName);
try
{
String serviceActiveFlag = makeActiveServiceFlagName(serviceType, serviceName);
if (!checkGlobalFlag(serviceActiveFlag))
throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+" is not active");
- writeServiceData(serviceType, serviceName, null);
+ deleteServiceData(serviceType, serviceName);
clearGlobalFlag(serviceActiveFlag);
}
finally
{
- leaveWriteLock(serviceLock);
+ leaveWriteLock(serviceTypeLockName);
}
}
@@ -406,14 +430,15 @@ public class BaseLockManager implements
public boolean checkServiceActive(String serviceType, String serviceName)
throws ManifoldCFException
{
- enterReadLock(serviceLock);
+ String serviceTypeLockName = buildServiceTypeLockName(serviceType);
+ enterReadLock(serviceTypeLockName);
try
{
return checkGlobalFlag(makeActiveServiceFlagName(serviceType, serviceName));
}
finally
{
- leaveReadLock(serviceLock);
+ leaveReadLock(serviceTypeLockName);
}
}
@@ -462,21 +487,82 @@ public class BaseLockManager implements
writeData(serviceCounterName,serviceCounterData);
}
- protected void writeServiceData(String serviceType, String serviceName, byte[] serviceData)
+ protected void writeServiceData(String serviceType, String serviceName, String dataType, byte[] serviceData)
throws ManifoldCFException
{
- writeData(makeServiceDataName(serviceType, serviceName), serviceData);
+ writeData(makeServiceDataName(serviceType, serviceName, dataType), serviceData);
}
- protected byte[] readServiceData(String serviceType, String serviceName)
+ protected byte[] readServiceData(String serviceType, String serviceName, String dataType)
throws ManifoldCFException
{
- return readData(makeServiceDataName(serviceType, serviceName));
+ return readData(makeServiceDataName(serviceType, serviceName, dataType));
}
- protected static String makeServiceDataName(String serviceType, String serviceName)
+ protected void deleteServiceData(String serviceType, String serviceName)
+ throws ManifoldCFException
{
- return serviceDataPrefix + serviceType + "_" + serviceName;
+ List<String> dataTypes = new ArrayList<String>();
+ int i = 0;
+ while (true)
+ {
+ String dataType = readDataType(serviceType, serviceName, i);
+ if (dataType == null)
+ break;
+ dataTypes.add(dataType);
+ i++;
+ }
+ for (String dataType : dataTypes)
+ {
+ writeServiceData(serviceType, serviceName, dataType, null);
+ }
+ while (i > 0)
+ {
+ i--;
+ writeDataType(serviceType, serviceName, i, null);
+ }
+ }
+
+ protected String readDataType(String serviceType, String serviceName, int i)
+ throws ManifoldCFException
+ {
+ String listEntry = buildServiceDataListEntry(serviceType, serviceName, i);
+ byte[] data = readData(listEntry);
+ if (data == null)
+ return null;
+ try
+ {
+ return new String(data, "utf-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException("utf-8 not supported");
+ }
+ }
+
+ protected void writeDataType(String serviceType, String serviceName, int i, String dataType)
+ throws ManifoldCFException
+ {
+ String listEntry = buildServiceDataListEntry(serviceType, serviceName, i);
+ if (dataType == null)
+ writeData(listEntry, null);
+ else
+ {
+ try
+ {
+ byte[] data = dataType.getBytes("utf-8");
+ writeData(listEntry,data);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException("utf-8 not supported");
+ }
+ }
+ }
+
+ protected static String makeServiceDataName(String serviceType, String serviceName, String dataType)
+ {
+ return serviceDataPrefix + serviceType + "_" + serviceName + "_" + dataType;
}
protected static String makeActiveServiceFlagName(String serviceType, String serviceName)
@@ -523,6 +609,16 @@ public class BaseLockManager implements
return serviceListPrefix + serviceType + "_" + i;
}
+ protected static String buildServiceDataListEntry(String serviceType, String serviceName, int i)
+ {
+ return serviceDataListPrefix + serviceType + "_" + serviceName + "_" + i;
+ }
+
+ protected static String buildServiceTypeLockName(String serviceType)
+ {
+ return serviceTypeLockPrefix + serviceType;
+ }
+
/** Get the current shared configuration. This configuration is available in common among all nodes,
* and thus must not be accessed through here for the purpose of finding configuration data that is specific to any one
* specific node.
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java?rev=1548340&r1=1548339&r2=1548340&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/LockManager.java Fri Dec 6 00:39:00 2013
@@ -57,42 +57,43 @@ public class LockManager implements ILoc
*@param serviceType is the type of service.
*@param serviceName is the name of the service to register. If null is passed, a transient unique service name will be
* created, and will be returned to the caller.
- *@param serviceData is the initial value of the service's transient data, or null if none.
*@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists.
* May be null. Local service cleanup is never called if the serviceName argument is null.
*@return the actual service name.
*/
@Override
- public String registerServiceBeginServiceActivity(String serviceType, String serviceName, byte[] serviceData,
+ public String registerServiceBeginServiceActivity(String serviceType, String serviceName,
IServiceCleanup cleanup)
throws ManifoldCFException
{
- return lockManager.registerServiceBeginServiceActivity(serviceType, serviceName, serviceData, cleanup);
+ return lockManager.registerServiceBeginServiceActivity(serviceType, serviceName, cleanup);
}
- /** Update service data for a service.
+ /** Set service data for a service.
*@param serviceType is the type of service.
*@param serviceName is the name of the service.
+ *@param dataType is the type of data.
*@param serviceData is the data to update to (may be null).
* This updates the service's transient data (or deletes it). If the service is not active, an exception is thrown.
*/
@Override
- public void updateServiceData(String serviceType, String serviceName, byte[] serviceData)
+ public void updateServiceData(String serviceType, String serviceName, String dataType, byte[] serviceData)
throws ManifoldCFException
{
- lockManager.updateServiceData(serviceType, serviceName, serviceData);
+ lockManager.updateServiceData(serviceType, serviceName, dataType, serviceData);
}
/** Retrieve service data for a service.
*@param serviceType is the type of service.
*@param serviceName is the name of the service.
+ *@param dataType is the type of data.
*@return the service's transient data.
*/
@Override
- public byte[] retrieveServiceData(String serviceType, String serviceName)
+ public byte[] retrieveServiceData(String serviceType, String serviceName, String dataType)
throws ManifoldCFException
{
- return lockManager.retrieveServiceData(serviceType, serviceName);
+ return lockManager.retrieveServiceData(serviceType, serviceName, dataType);
}
/** Clean up any inactive services found.
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java?rev=1548340&r1=1548339&r2=1548340&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java Fri Dec 6 00:39:00 2013
@@ -106,15 +106,16 @@ public class ZooKeeperConnection
*@param nodePath is the path of the node.
*@return the data, if the node if exists, otherwise null.
*/
- public byte[] getNodeData(String nodePath)
+ public byte[] getNodeData(String nodePath, String childName)
throws ManifoldCFException, InterruptedException
{
try
{
- Stat s = zookeeper.exists(nodePath,false);
+ String combinedPath = nodePath + "/" + childName;
+ Stat s = zookeeper.exists(combinedPath,false);
if (s == null)
return null;
- return zookeeper.getData(nodePath,false,s);
+ return zookeeper.getData(combinedPath,false,s);
}
catch (KeeperException e)
{
@@ -124,12 +125,39 @@ public class ZooKeeperConnection
/** Set node data.
*/
- public void setNodeData(String nodePath, byte[] data)
+ public void setNodeData(String nodePath, String childName, byte[] data)
throws ManifoldCFException, InterruptedException
{
try
{
- zookeeper.setData(nodePath,data,-1);
+ String combinedPath = nodePath + "/" + childName;
+ while (true)
+ {
+ try
+ {
+ zookeeper.setData(combinedPath, data, -1);
+ return;
+ }
+ catch (KeeperException.NoNodeException e1)
+ {
+ // Create parent unless it already exists
+ try
+ {
+ zookeeper.create(combinedPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ return;
+ }
+ catch (KeeperException.NoNodeException e)
+ {
+ try
+ {
+ zookeeper.create(nodePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ catch (KeeperException.NodeExistsException e2)
+ {
+ }
+ }
+ }
+ }
}
catch (KeeperException e)
{
@@ -152,6 +180,28 @@ public class ZooKeeperConnection
}
}
+ /** Delete all a node's children.
+ */
+ public void deleteNodeChildren(String nodePath)
+ throws ManifoldCFException, InterruptedException
+ {
+ try
+ {
+ List<String> children = zookeeper.getChildren(nodePath,false);
+ for (String child : children)
+ {
+ zookeeper.delete(nodePath + "/" + child,-1);
+ }
+ }
+ catch (KeeperException.NoNodeException e)
+ {
+ }
+ catch (KeeperException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),e);
+ }
+ }
+
/** Get the relative paths of all node's children. If the node does not exist,
* return an empty list.
*/
Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java?rev=1548340&r1=1548339&r2=1548340&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java Fri Dec 6 00:39:00 2013
@@ -43,6 +43,7 @@ public class ZooKeeperLockManager extend
private final static String SERVICETYPE_LOCK_PATH_PREFIX = "/org.apache.manifoldcf.servicelock-";
private final static String SERVICETYPE_ACTIVE_PATH_PREFIX = "/org.apache.manifoldcf.serviceactive-";
private final static String SERVICETYPE_REGISTER_PATH_PREFIX = "/org.apache.manifoldcf.service-";
+ private final static String SERVICETYPE_DATA_PATH_PREFIX = "/org.apache.manifoldcf.servicedata-";
/** Anonymous service name prefix, to be followed by an integer */
protected final static String anonymousServiceNamePrefix = "_ANON_";
@@ -107,13 +108,12 @@ public class ZooKeeperLockManager extend
*@param serviceType is the type of service.
*@param serviceName is the name of the service to register. If null is passed, a transient unique service name will be
* created, and will be returned to the caller.
- *@param serviceData is the initial value of the service's transient data, or null if none.
*@param cleanup is called to clean up either the current service, or all services of this type, if no other active service exists.
* May be null. Local service cleanup is never called if the serviceName argument is null.
*@return the actual service name.
*/
@Override
- public String registerServiceBeginServiceActivity(String serviceType, String serviceName, byte[] serviceData,
+ public String registerServiceBeginServiceActivity(String serviceType, String serviceName,
IServiceCleanup cleanup)
throws ManifoldCFException
{
@@ -187,7 +187,7 @@ public class ZooKeeperLockManager extend
}
// Last, set the appropriate active flag
- connection.createNode(activePath, (serviceData==null)?new byte[0]:serviceData);
+ connection.createNode(activePath, null);
return serviceName;
}
finally
@@ -206,13 +206,15 @@ public class ZooKeeperLockManager extend
}
}
- /** Update service data for a service.
+ /** Set service data for a service.
*@param serviceType is the type of service.
*@param serviceName is the name of the service.
+ *@param dataType is the type of data.
*@param serviceData is the data to update to (may be null).
* This updates the service's transient data (or deletes it). If the service is not active, an exception is thrown.
*/
- public void updateServiceData(String serviceType, String serviceName, byte[] serviceData)
+ @Override
+ public void updateServiceData(String serviceType, String serviceName, String dataType, byte[] serviceData)
throws ManifoldCFException
{
try
@@ -223,7 +225,8 @@ public class ZooKeeperLockManager extend
enterServiceRegistryLock(connection, serviceType);
try
{
- connection.setNodeData(buildServiceTypeActivePath(serviceType, serviceName), (serviceData==null)?new byte[0]:serviceData);
+ String dataRootPath = buildServiceTypeDataPath(serviceType, serviceName);
+ connection.setNodeData(dataRootPath, dataType, (serviceData==null)?new byte[0]:serviceData);
}
finally
{
@@ -244,9 +247,11 @@ public class ZooKeeperLockManager extend
/** Retrieve service data for a service.
*@param serviceType is the type of service.
*@param serviceName is the name of the service.
+ *@param dataType is the type of data.
*@return the service's transient data.
*/
- public byte[] retrieveServiceData(String serviceType, String serviceName)
+ @Override
+ public byte[] retrieveServiceData(String serviceType, String serviceName, String dataType)
throws ManifoldCFException
{
try
@@ -257,7 +262,8 @@ public class ZooKeeperLockManager extend
enterServiceRegistryLock(connection, serviceType);
try
{
- return connection.getNodeData(buildServiceTypeActivePath(serviceType, serviceName));
+ String dataRootPath = buildServiceTypeDataPath(serviceType, serviceName);
+ return connection.getNodeData(dataRootPath, dataType);
}
finally
{
@@ -399,6 +405,7 @@ public class ZooKeeperLockManager extend
enterServiceRegistryLock(connection, serviceType);
try
{
+ connection.deleteNodeChildren(buildServiceTypeDataPath(serviceType,serviceName));
connection.deleteNode(buildServiceTypeActivePath(serviceType, serviceName));
}
finally
@@ -539,6 +546,12 @@ public class ZooKeeperLockManager extend
return SERVICETYPE_REGISTER_PATH_PREFIX + serviceType;
}
+ /** Build a zk path of a node to have child nodes with service data */
+ protected static String buildServiceTypeDataPath(String serviceType, String serviceName)
+ {
+ return SERVICETYPE_DATA_PATH_PREFIX + serviceType + "-" + serviceName;
+ }
+
// Shared configuration
/** Get the current shared configuration. This configuration is available in common among all nodes,