You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/11/28 02:41:31 UTC
svn commit: r1642255 - in /manifoldcf/trunk/framework:
agents/src/main/java/org/apache/manifoldcf/agents/outputconnection/
agents/src/main/java/org/apache/manifoldcf/agents/transformationconnection/
pull-agent/src/main/java/org/apache/manifoldcf/author...
Author: kwright
Date: Fri Nov 28 01:41:31 2014
New Revision: 1642255
URL: http://svn.apache.org/r1642255
Log:
Real fix for CONNECTORS-1116.
Modified:
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnection/OutputConnectionManager.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformationconnection/TransformationConnectionManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/authority/AuthorityConnectionManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/mapping/MappingConnectionManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/repository/RepositoryConnectionManager.java
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnection/OutputConnectionManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnection/OutputConnectionManager.java?rev=1642255&r1=1642254&r2=1642255&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnection/OutputConnectionManager.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/outputconnection/OutputConnectionManager.java Fri Nov 28 01:41:31 2014
@@ -62,7 +62,11 @@ public class OutputConnectionManager ext
protected final ICacheManager cacheManager;
// Thread context
protected final IThreadContext threadContext;
-
+ // Lock manager
+ protected final ILockManager lockManager;
+
+ protected final static String outputsLock = "OUTPUTS_LOCK";
+
/** Constructor.
*@param threadContext is the thread context.
*/
@@ -72,6 +76,7 @@ public class OutputConnectionManager ext
super(database,"outputconnections");
cacheManager = CacheManagerFactory.make(threadContext);
+ lockManager = LockManagerFactory.make(threadContext);
this.threadContext = threadContext;
}
@@ -186,7 +191,7 @@ public class OutputConnectionManager ext
public IOutputConnection[] getAllConnections()
throws ManifoldCFException
{
- beginTransaction();
+ lockManager.enterReadLock(outputsLock);
try
{
// Read all the tools
@@ -203,19 +208,9 @@ public class OutputConnectionManager ext
}
return loadMultiple(names);
}
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
finally
{
- endTransaction();
+ lockManager.leaveReadLock(outputsLock);
}
}
@@ -293,108 +288,116 @@ public class OutputConnectionManager ext
public boolean save(IOutputConnection object)
throws ManifoldCFException
{
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getOutputConnectionsKey());
- ssb.add(getOutputConnectionKey(object.getName()));
- StringSet cacheKeys = new StringSet(ssb);
- while (true)
+ lockManager.enterWriteLock(outputsLock);
+ try
{
- // Catch deadlock condition
- long sleepAmt = 0L;
- try
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getOutputConnectionsKey());
+ ssb.add(getOutputConnectionKey(object.getName()));
+ StringSet cacheKeys = new StringSet(ssb);
+ while (true)
{
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ // Catch deadlock condition
+ long sleepAmt = 0L;
try
{
- beginTransaction();
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- //performLock();
- // Notify of a change to the configuration
- ManifoldCF.noteConfigurationChange();
- boolean isNew = object.getIsNew();
- // See whether the instance exists
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,object.getName())});
- IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
- query+" FOR UPDATE",params,null,null);
- HashMap values = new HashMap();
- values.put(descriptionField,object.getDescription());
- values.put(classNameField,object.getClassName());
- values.put(maxCountField,new Long((long)object.getMaxConnections()));
- String configXML = object.getConfigParams().toXML();
- values.put(configField,configXML);
- boolean notificationNeeded = false;
- boolean isCreated;
-
- if (set.getRowCount() > 0)
+ beginTransaction();
+ try
{
- // If the object is supposedly new, it is bad that we found one that already exists.
- if (isNew)
- throw new ManifoldCFException("Output connection '"+object.getName()+"' already exists");
- isCreated = false;
- IResultRow row = set.getRow(0);
- String oldXML = (String)row.getValue(configField);
- if (oldXML == null || !oldXML.equals(configXML))
- notificationNeeded = true;
-
- // Update
- params.clear();
- query = buildConjunctionClause(params,new ClauseDescription[]{
+ //performLock();
+ // Notify of a change to the configuration
+ ManifoldCF.noteConfigurationChange();
+ boolean isNew = object.getIsNew();
+ // See whether the instance exists
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
new UnitaryClause(nameField,object.getName())});
- performUpdate(values," WHERE "+query,params,null);
+ IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
+ query+" FOR UPDATE",params,null,null);
+ HashMap values = new HashMap();
+ values.put(descriptionField,object.getDescription());
+ values.put(classNameField,object.getClassName());
+ values.put(maxCountField,new Long((long)object.getMaxConnections()));
+ String configXML = object.getConfigParams().toXML();
+ values.put(configField,configXML);
+ boolean notificationNeeded = false;
+ boolean isCreated;
+
+ if (set.getRowCount() > 0)
+ {
+ // If the object is supposedly new, it is bad that we found one that already exists.
+ if (isNew)
+ throw new ManifoldCFException("Output connection '"+object.getName()+"' already exists");
+ isCreated = false;
+ IResultRow row = set.getRow(0);
+ String oldXML = (String)row.getValue(configField);
+ if (oldXML == null || !oldXML.equals(configXML))
+ notificationNeeded = true;
+
+ // Update
+ params.clear();
+ query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,object.getName())});
+ performUpdate(values," WHERE "+query,params,null);
+ }
+ else
+ {
+ // If the object is not supposed to be new, it is bad that we did not find one.
+ if (!isNew)
+ throw new ManifoldCFException("Output connection '"+object.getName()+"' no longer exists");
+ isCreated = true;
+ // Insert
+ values.put(nameField,object.getName());
+ // We only need the general key because this is new.
+ performInsert(values,null);
+ }
+
+ // If notification required, do it.
+ if (notificationNeeded)
+ AgentManagerFactory.noteOutputConnectionChange(threadContext,object.getName());
+
+ cacheManager.invalidateKeys(ch);
+ return isCreated;
}
- else
+ catch (ManifoldCFException e)
{
- // If the object is not supposed to be new, it is bad that we did not find one.
- if (!isNew)
- throw new ManifoldCFException("Output connection '"+object.getName()+"' no longer exists");
- isCreated = true;
- // Insert
- values.put(nameField,object.getName());
- // We only need the general key because this is new.
- performInsert(values,null);
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
}
-
- // If notification required, do it.
- if (notificationNeeded)
- AgentManagerFactory.noteOutputConnectionChange(threadContext,object.getName());
-
- cacheManager.invalidateKeys(ch);
- return isCreated;
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
+ catch (ManifoldCFException e)
+ {
+ // Is this a deadlock exception? If so, we want to try again.
+ if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
+ throw e;
+ sleepAmt = getSleepAmt();
+ }
finally
{
- cacheManager.leaveCache(ch);
+ sleepFor(sleepAmt);
}
}
- catch (ManifoldCFException e)
- {
- // Is this a deadlock exception? If so, we want to try again.
- if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
- throw e;
- sleepAmt = getSleepAmt();
- }
- finally
- {
- sleepFor(sleepAmt);
- }
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(outputsLock);
}
}
@@ -405,46 +408,53 @@ public class OutputConnectionManager ext
public void delete(String name)
throws ManifoldCFException
{
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getOutputConnectionsKey());
- ssb.add(getOutputConnectionKey(name));
- StringSet cacheKeys = new StringSet(ssb);
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ lockManager.enterWriteLock(outputsLock);
try
{
- beginTransaction();
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getOutputConnectionsKey());
+ ssb.add(getOutputConnectionKey(name));
+ StringSet cacheKeys = new StringSet(ssb);
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- // Check if anything refers to this connection name
- if (AgentManagerFactory.isOutputConnectionInUse(threadContext,name))
- throw new ManifoldCFException("Can't delete output connection '"+name+"': existing entities refer to it");
- ManifoldCF.noteConfigurationChange();
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,name)});
- performDelete("WHERE "+query,params,null);
- cacheManager.invalidateKeys(ch);
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
+ beginTransaction();
+ try
+ {
+ // Check if anything refers to this connection name
+ if (AgentManagerFactory.isOutputConnectionInUse(threadContext,name))
+ throw new ManifoldCFException("Can't delete output connection '"+name+"': existing entities refer to it");
+ ManifoldCF.noteConfigurationChange();
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,name)});
+ performDelete("WHERE "+query,params,null);
+ cacheManager.invalidateKeys(ch);
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
finally
{
- cacheManager.leaveCache(ch);
+ lockManager.leaveWriteLock(outputsLock);
}
-
}
/** Get a list of output connections that share the same connector.
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformationconnection/TransformationConnectionManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformationconnection/TransformationConnectionManager.java?rev=1642255&r1=1642254&r2=1642255&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformationconnection/TransformationConnectionManager.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/transformationconnection/TransformationConnectionManager.java Fri Nov 28 01:41:31 2014
@@ -62,7 +62,11 @@ public class TransformationConnectionMan
protected final ICacheManager cacheManager;
// Thread context
protected final IThreadContext threadContext;
-
+ // Lock manager
+ protected final ILockManager lockManager;
+
+ protected final static String transformationsLock = "TRANSFORMATIONS_LOCK";
+
/** Constructor.
*@param threadContext is the thread context.
*/
@@ -72,6 +76,7 @@ public class TransformationConnectionMan
super(database,"transformationconnections");
cacheManager = CacheManagerFactory.make(threadContext);
+ lockManager = LockManagerFactory.make(threadContext);
this.threadContext = threadContext;
}
@@ -186,7 +191,7 @@ public class TransformationConnectionMan
public ITransformationConnection[] getAllConnections()
throws ManifoldCFException
{
- beginTransaction();
+ lockManager.enterReadLock(transformationsLock);
try
{
// Read all the tools
@@ -203,19 +208,9 @@ public class TransformationConnectionMan
}
return loadMultiple(names);
}
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
finally
{
- endTransaction();
+ lockManager.leaveReadLock(transformationsLock);
}
}
@@ -293,108 +288,116 @@ public class TransformationConnectionMan
public boolean save(ITransformationConnection object)
throws ManifoldCFException
{
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getTransformationConnectionsKey());
- ssb.add(getTransformationConnectionKey(object.getName()));
- StringSet cacheKeys = new StringSet(ssb);
- while (true)
+ lockManager.enterWriteLock(transformationsLock);
+ try
{
- // Catch deadlock condition
- long sleepAmt = 0L;
- try
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getTransformationConnectionsKey());
+ ssb.add(getTransformationConnectionKey(object.getName()));
+ StringSet cacheKeys = new StringSet(ssb);
+ while (true)
{
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ // Catch deadlock condition
+ long sleepAmt = 0L;
try
{
- beginTransaction();
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- //performLock();
- // Notify of a change to the configuration
- ManifoldCF.noteConfigurationChange();
- boolean isNew = object.getIsNew();
- // See whether the instance exists
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,object.getName())});
- IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
- query+" FOR UPDATE",params,null,null);
- HashMap values = new HashMap();
- values.put(descriptionField,object.getDescription());
- values.put(classNameField,object.getClassName());
- values.put(maxCountField,new Long((long)object.getMaxConnections()));
- String configXML = object.getConfigParams().toXML();
- values.put(configField,configXML);
- boolean notificationNeeded = false;
- boolean isCreated;
-
- if (set.getRowCount() > 0)
+ beginTransaction();
+ try
{
- // If the object is supposedly new, it is bad that we found one that already exists.
- if (isNew)
- throw new ManifoldCFException("Transformation connection '"+object.getName()+"' already exists");
- isCreated = false;
- IResultRow row = set.getRow(0);
- String oldXML = (String)row.getValue(configField);
- if (oldXML == null || !oldXML.equals(configXML))
- notificationNeeded = true;
-
- // Update
- params.clear();
- query = buildConjunctionClause(params,new ClauseDescription[]{
+ //performLock();
+ // Notify of a change to the configuration
+ ManifoldCF.noteConfigurationChange();
+ boolean isNew = object.getIsNew();
+ // See whether the instance exists
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
new UnitaryClause(nameField,object.getName())});
- performUpdate(values," WHERE "+query,params,null);
+ IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
+ query+" FOR UPDATE",params,null,null);
+ HashMap values = new HashMap();
+ values.put(descriptionField,object.getDescription());
+ values.put(classNameField,object.getClassName());
+ values.put(maxCountField,new Long((long)object.getMaxConnections()));
+ String configXML = object.getConfigParams().toXML();
+ values.put(configField,configXML);
+ boolean notificationNeeded = false;
+ boolean isCreated;
+
+ if (set.getRowCount() > 0)
+ {
+ // If the object is supposedly new, it is bad that we found one that already exists.
+ if (isNew)
+ throw new ManifoldCFException("Transformation connection '"+object.getName()+"' already exists");
+ isCreated = false;
+ IResultRow row = set.getRow(0);
+ String oldXML = (String)row.getValue(configField);
+ if (oldXML == null || !oldXML.equals(configXML))
+ notificationNeeded = true;
+
+ // Update
+ params.clear();
+ query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,object.getName())});
+ performUpdate(values," WHERE "+query,params,null);
+ }
+ else
+ {
+ // If the object is not supposed to be new, it is bad that we did not find one.
+ if (!isNew)
+ throw new ManifoldCFException("Transformation connection '"+object.getName()+"' no longer exists");
+ isCreated = true;
+ // Insert
+ values.put(nameField,object.getName());
+ // We only need the general key because this is new.
+ performInsert(values,null);
+ }
+
+ // If notification required, do it.
+ if (notificationNeeded)
+ AgentManagerFactory.noteTransformationConnectionChange(threadContext,object.getName());
+
+ cacheManager.invalidateKeys(ch);
+ return isCreated;
}
- else
+ catch (ManifoldCFException e)
{
- // If the object is not supposed to be new, it is bad that we did not find one.
- if (!isNew)
- throw new ManifoldCFException("Transformation connection '"+object.getName()+"' no longer exists");
- isCreated = true;
- // Insert
- values.put(nameField,object.getName());
- // We only need the general key because this is new.
- performInsert(values,null);
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
}
-
- // If notification required, do it.
- if (notificationNeeded)
- AgentManagerFactory.noteTransformationConnectionChange(threadContext,object.getName());
-
- cacheManager.invalidateKeys(ch);
- return isCreated;
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
+ catch (ManifoldCFException e)
+ {
+ // Is this a deadlock exception? If so, we want to try again.
+ if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
+ throw e;
+ sleepAmt = getSleepAmt();
+ }
finally
{
- cacheManager.leaveCache(ch);
+ sleepFor(sleepAmt);
}
}
- catch (ManifoldCFException e)
- {
- // Is this a deadlock exception? If so, we want to try again.
- if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
- throw e;
- sleepAmt = getSleepAmt();
- }
- finally
- {
- sleepFor(sleepAmt);
- }
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(transformationsLock);
}
}
@@ -405,46 +408,53 @@ public class TransformationConnectionMan
public void delete(String name)
throws ManifoldCFException
{
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getTransformationConnectionsKey());
- ssb.add(getTransformationConnectionKey(name));
- StringSet cacheKeys = new StringSet(ssb);
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ lockManager.enterWriteLock(transformationsLock);
try
{
- beginTransaction();
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getTransformationConnectionsKey());
+ ssb.add(getTransformationConnectionKey(name));
+ StringSet cacheKeys = new StringSet(ssb);
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- // Check if anything refers to this connection name
- if (AgentManagerFactory.isTransformationConnectionInUse(threadContext,name))
- throw new ManifoldCFException("Can't delete transformation connection '"+name+"': existing entities refer to it");
- ManifoldCF.noteConfigurationChange();
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,name)});
- performDelete("WHERE "+query,params,null);
- cacheManager.invalidateKeys(ch);
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
+ beginTransaction();
+ try
+ {
+ // Check if anything refers to this connection name
+ if (AgentManagerFactory.isTransformationConnectionInUse(threadContext,name))
+ throw new ManifoldCFException("Can't delete transformation connection '"+name+"': existing entities refer to it");
+ ManifoldCF.noteConfigurationChange();
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,name)});
+ performDelete("WHERE "+query,params,null);
+ cacheManager.invalidateKeys(ch);
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
finally
{
- cacheManager.leaveCache(ch);
+ lockManager.leaveWriteLock(transformationsLock);
}
-
}
/** Get a list of output connections that share the same connector.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/authority/AuthorityConnectionManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/authority/AuthorityConnectionManager.java?rev=1642255&r1=1642254&r2=1642255&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/authority/AuthorityConnectionManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/authority/AuthorityConnectionManager.java Fri Nov 28 01:41:31 2014
@@ -63,10 +63,14 @@ public class AuthorityConnectionManager
protected final static String groupNameField = "groupname";
// Cache manager
- ICacheManager cacheManager;
+ protected final ICacheManager cacheManager;
// Thread context
- IThreadContext threadContext;
-
+ protected final IThreadContext threadContext;
+ // Lock manager
+ protected final ILockManager lockManager;
+
+ protected final static String authoritiesLock = "AUTHORITIES_LOCK";
+
/** Constructor.
*@param threadContext is the thread context.
*/
@@ -76,6 +80,7 @@ public class AuthorityConnectionManager
super(database,"authconnections");
cacheManager = CacheManagerFactory.make(threadContext);
+ lockManager = LockManagerFactory.make(threadContext);
this.threadContext = threadContext;
}
@@ -236,7 +241,7 @@ public class AuthorityConnectionManager
public IAuthorityConnection[] getDomainConnections(String authDomain)
throws ManifoldCFException
{
- beginTransaction();
+ lockManager.enterReadLock(authoritiesLock);
try
{
// Read the connections for the domain
@@ -256,19 +261,9 @@ public class AuthorityConnectionManager
}
return loadMultiple(names);
}
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
finally
{
- endTransaction();
+ lockManager.leaveReadLock(authoritiesLock);
}
}
@@ -279,7 +274,7 @@ public class AuthorityConnectionManager
public IAuthorityConnection[] getAllConnections()
throws ManifoldCFException
{
- beginTransaction();
+ lockManager.enterReadLock(authoritiesLock);
try
{
// Read all the tools
@@ -296,19 +291,9 @@ public class AuthorityConnectionManager
}
return loadMultiple(names);
}
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
finally
{
- endTransaction();
+ lockManager.leaveReadLock(authoritiesLock);
}
}
@@ -390,99 +375,107 @@ public class AuthorityConnectionManager
public boolean save(IAuthorityConnection object)
throws ManifoldCFException
{
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getAuthorityConnectionsKey());
- ssb.add(getAuthorityConnectionKey(object.getName()));
- StringSet cacheKeys = new StringSet(ssb);
- while (true)
+ lockManager.enterWriteLock(authoritiesLock);
+ try
{
- long sleepAmt = 0L;
- try
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getAuthorityConnectionsKey());
+ ssb.add(getAuthorityConnectionKey(object.getName()));
+ StringSet cacheKeys = new StringSet(ssb);
+ while (true)
{
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ long sleepAmt = 0L;
try
{
- beginTransaction();
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- //performLock();
- ManifoldCF.noteConfigurationChange();
- boolean isNew = object.getIsNew();
- // See whether the instance exists
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,object.getName())});
- IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
- query+" FOR UPDATE",params,null,null);
- HashMap values = new HashMap();
- values.put(descriptionField,object.getDescription());
- values.put(classNameField,object.getClassName());
- values.put(maxCountField,new Long((long)object.getMaxConnections()));
- values.put(configField,object.getConfigParams().toXML());
- values.put(mappingField,object.getPrerequisiteMapping());
- values.put(authDomainField,object.getAuthDomain());
- values.put(groupNameField,object.getAuthGroup());
-
- boolean isCreated;
-
- if (set.getRowCount() > 0)
+ beginTransaction();
+ try
{
- // If the object is supposedly new, it is bad that we found one that already exists.
- if (isNew)
- throw new ManifoldCFException("Authority connection '"+object.getName()+"' already exists");
- isCreated = false;
- // Update
- params.clear();
- query = buildConjunctionClause(params,new ClauseDescription[]{
+ //performLock();
+ ManifoldCF.noteConfigurationChange();
+ boolean isNew = object.getIsNew();
+ // See whether the instance exists
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
new UnitaryClause(nameField,object.getName())});
- performUpdate(values," WHERE "+query,params,null);
+ IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
+ query+" FOR UPDATE",params,null,null);
+ HashMap values = new HashMap();
+ values.put(descriptionField,object.getDescription());
+ values.put(classNameField,object.getClassName());
+ values.put(maxCountField,new Long((long)object.getMaxConnections()));
+ values.put(configField,object.getConfigParams().toXML());
+ values.put(mappingField,object.getPrerequisiteMapping());
+ values.put(authDomainField,object.getAuthDomain());
+ values.put(groupNameField,object.getAuthGroup());
+
+ boolean isCreated;
+
+ if (set.getRowCount() > 0)
+ {
+ // If the object is supposedly new, it is bad that we found one that already exists.
+ if (isNew)
+ throw new ManifoldCFException("Authority connection '"+object.getName()+"' already exists");
+ isCreated = false;
+ // Update
+ params.clear();
+ query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,object.getName())});
+ performUpdate(values," WHERE "+query,params,null);
+ }
+ else
+ {
+ // If the object is not supposed to be new, it is bad that we did not find one.
+ if (!isNew)
+ throw new ManifoldCFException("Authority connection '"+object.getName()+"' no longer exists");
+ isCreated = true;
+ // Insert
+ values.put(nameField,object.getName());
+ // We only need the general key because this is new.
+ performInsert(values,null);
+ }
+
+ cacheManager.invalidateKeys(ch);
+ return isCreated;
}
- else
+ catch (ManifoldCFException e)
{
- // If the object is not supposed to be new, it is bad that we did not find one.
- if (!isNew)
- throw new ManifoldCFException("Authority connection '"+object.getName()+"' no longer exists");
- isCreated = true;
- // Insert
- values.put(nameField,object.getName());
- // We only need the general key because this is new.
- performInsert(values,null);
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
}
-
- cacheManager.invalidateKeys(ch);
- return isCreated;
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
+ catch (ManifoldCFException e)
+ {
+ // Is this a deadlock exception? If so, we want to try again.
+ if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
+ throw e;
+ sleepAmt = getSleepAmt();
+ }
finally
{
- cacheManager.leaveCache(ch);
+ sleepFor(sleepAmt);
}
}
- catch (ManifoldCFException e)
- {
- // Is this a deadlock exception? If so, we want to try again.
- if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
- throw e;
- sleepAmt = getSleepAmt();
- }
- finally
- {
- sleepFor(sleepAmt);
- }
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(authoritiesLock);
}
}
@@ -494,44 +487,50 @@ public class AuthorityConnectionManager
public void delete(String name)
throws ManifoldCFException
{
-
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getAuthorityConnectionsKey());
- ssb.add(getAuthorityConnectionKey(name));
- StringSet cacheKeys = new StringSet(ssb);
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ lockManager.enterWriteLock(authoritiesLock);
try
{
- beginTransaction();
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getAuthorityConnectionsKey());
+ ssb.add(getAuthorityConnectionKey(name));
+ StringSet cacheKeys = new StringSet(ssb);
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- ManifoldCF.noteConfigurationChange();
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,name)});
- performDelete("WHERE "+query,params,null);
- cacheManager.invalidateKeys(ch);
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
+ beginTransaction();
+ try
+ {
+ ManifoldCF.noteConfigurationChange();
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,name)});
+ performDelete("WHERE "+query,params,null);
+ cacheManager.invalidateKeys(ch);
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
finally
{
- cacheManager.leaveCache(ch);
+ lockManager.leaveWriteLock(authoritiesLock);
}
-
}
/** Get the authority connection name column.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/mapping/MappingConnectionManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/mapping/MappingConnectionManager.java?rev=1642255&r1=1642254&r2=1642255&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/mapping/MappingConnectionManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/authorities/mapping/MappingConnectionManager.java Fri Nov 28 01:41:31 2014
@@ -55,10 +55,14 @@ public class MappingConnectionManager ex
protected final static String mappingField = "mappingname";
// Cache manager
- ICacheManager cacheManager;
+ protected final ICacheManager cacheManager;
// Thread context
- IThreadContext threadContext;
-
+ protected final IThreadContext threadContext;
+ // Lock manager
+ protected final ILockManager lockManager;
+
+ protected final static String mappingsLock = "MAPPINGS_LOCK";
+
/** Constructor.
*@param threadContext is the thread context.
*/
@@ -68,6 +72,7 @@ public class MappingConnectionManager ex
super(database,"mapconnections");
cacheManager = CacheManagerFactory.make(threadContext);
+ lockManager = LockManagerFactory.make(threadContext);
this.threadContext = threadContext;
}
@@ -288,7 +293,7 @@ public class MappingConnectionManager ex
public IMappingConnection[] getAllConnections()
throws ManifoldCFException
{
- beginTransaction();
+ lockManager.enterReadLock(mappingsLock);
try
{
// Read all the tools
@@ -305,19 +310,9 @@ public class MappingConnectionManager ex
}
return loadMultiple(names);
}
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
finally
{
- endTransaction();
+ lockManager.leaveReadLock(mappingsLock);
}
}
@@ -399,97 +394,105 @@ public class MappingConnectionManager ex
public boolean save(IMappingConnection object)
throws ManifoldCFException
{
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getMappingConnectionsKey());
- ssb.add(getMappingConnectionKey(object.getName()));
- StringSet cacheKeys = new StringSet(ssb);
- while (true)
+ lockManager.enterWriteLock(mappingsLock);
+ try
{
- long sleepAmt = 0L;
- try
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getMappingConnectionsKey());
+ ssb.add(getMappingConnectionKey(object.getName()));
+ StringSet cacheKeys = new StringSet(ssb);
+ while (true)
{
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ long sleepAmt = 0L;
try
{
- beginTransaction();
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- //performLock();
- ManifoldCF.noteConfigurationChange();
- boolean isNew = object.getIsNew();
- // See whether the instance exists
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,object.getName())});
- IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
- query+" FOR UPDATE",params,null,null);
- HashMap values = new HashMap();
- values.put(descriptionField,object.getDescription());
- values.put(classNameField,object.getClassName());
- values.put(maxCountField,new Long((long)object.getMaxConnections()));
- values.put(configField,object.getConfigParams().toXML());
- values.put(mappingField,object.getPrerequisiteMapping());
-
- boolean isCreated;
-
- if (set.getRowCount() > 0)
+ beginTransaction();
+ try
{
- // If the object is supposedly new, it is bad that we found one that already exists.
- if (isNew)
- throw new ManifoldCFException("Authority connection '"+object.getName()+"' already exists");
- isCreated = false;
- // Update
- params.clear();
- query = buildConjunctionClause(params,new ClauseDescription[]{
+ //performLock();
+ ManifoldCF.noteConfigurationChange();
+ boolean isNew = object.getIsNew();
+ // See whether the instance exists
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
new UnitaryClause(nameField,object.getName())});
- performUpdate(values," WHERE "+query,params,null);
+ IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
+ query+" FOR UPDATE",params,null,null);
+ HashMap values = new HashMap();
+ values.put(descriptionField,object.getDescription());
+ values.put(classNameField,object.getClassName());
+ values.put(maxCountField,new Long((long)object.getMaxConnections()));
+ values.put(configField,object.getConfigParams().toXML());
+ values.put(mappingField,object.getPrerequisiteMapping());
+
+ boolean isCreated;
+
+ if (set.getRowCount() > 0)
+ {
+ // If the object is supposedly new, it is bad that we found one that already exists.
+ if (isNew)
+ throw new ManifoldCFException("Authority connection '"+object.getName()+"' already exists");
+ isCreated = false;
+ // Update
+ params.clear();
+ query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,object.getName())});
+ performUpdate(values," WHERE "+query,params,null);
+ }
+ else
+ {
+ // If the object is not supposed to be new, it is bad that we did not find one.
+ if (!isNew)
+ throw new ManifoldCFException("Mapping connection '"+object.getName()+"' no longer exists");
+ isCreated = true;
+ // Insert
+ values.put(nameField,object.getName());
+ // We only need the general key because this is new.
+ performInsert(values,null);
+ }
+
+ cacheManager.invalidateKeys(ch);
+ return isCreated;
}
- else
+ catch (ManifoldCFException e)
{
- // If the object is not supposed to be new, it is bad that we did not find one.
- if (!isNew)
- throw new ManifoldCFException("Mapping connection '"+object.getName()+"' no longer exists");
- isCreated = true;
- // Insert
- values.put(nameField,object.getName());
- // We only need the general key because this is new.
- performInsert(values,null);
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
}
-
- cacheManager.invalidateKeys(ch);
- return isCreated;
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
+ catch (ManifoldCFException e)
+ {
+ // Is this a deadlock exception? If so, we want to try again.
+ if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
+ throw e;
+ sleepAmt = getSleepAmt();
+ }
finally
{
- cacheManager.leaveCache(ch);
+ sleepFor(sleepAmt);
}
}
- catch (ManifoldCFException e)
- {
- // Is this a deadlock exception? If so, we want to try again.
- if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
- throw e;
- sleepAmt = getSleepAmt();
- }
- finally
- {
- sleepFor(sleepAmt);
- }
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(mappingsLock);
}
}
@@ -505,48 +508,55 @@ public class MappingConnectionManager ex
// Grab authority connection manager handle, to check on legality of deletion.
IAuthorityConnectionManager authManager = AuthorityConnectionManagerFactory.make(threadContext);
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getMappingConnectionsKey());
- ssb.add(getMappingConnectionKey(name));
- StringSet cacheKeys = new StringSet(ssb);
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ lockManager.enterWriteLock(mappingsLock);
try
{
- beginTransaction();
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getMappingConnectionsKey());
+ ssb.add(getMappingConnectionKey(name));
+ StringSet cacheKeys = new StringSet(ssb);
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- // Check if any other mapping refers to this connection name
- if (isReferenced(name))
- throw new ManifoldCFException("Can't delete mapping connection '"+name+"': existing mapping connections refer to it");
- if (authManager.isMappingReferenced(name))
- throw new ManifoldCFException("Can't delete mapping connection '"+name+"': existing authority connections refer to it");
- ManifoldCF.noteConfigurationChange();
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,name)});
- performDelete("WHERE "+query,params,null);
- cacheManager.invalidateKeys(ch);
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
+ beginTransaction();
+ try
+ {
+ // Check if any other mapping refers to this connection name
+ if (isReferenced(name))
+ throw new ManifoldCFException("Can't delete mapping connection '"+name+"': existing mapping connections refer to it");
+ if (authManager.isMappingReferenced(name))
+ throw new ManifoldCFException("Can't delete mapping connection '"+name+"': existing authority connections refer to it");
+ ManifoldCF.noteConfigurationChange();
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,name)});
+ performDelete("WHERE "+query,params,null);
+ cacheManager.invalidateKeys(ch);
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
finally
{
- cacheManager.leaveCache(ch);
+ lockManager.leaveWriteLock(mappingsLock);
}
-
}
/** Get the mapping connection name column.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1642255&r1=1642254&r2=1642255&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Fri Nov 28 01:41:31 2014
@@ -350,8 +350,11 @@ public class Jobs extends org.apache.man
protected final IRepositoryConnectionManager connectionMgr;
protected final ITransformationConnectionManager transMgr;
+ protected final ILockManager lockManager;
+
protected final IThreadContext threadContext;
+ protected final static String jobsLock = "JOBS_LOCK";
/** Constructor.
*@param database is the database handle.
*/
@@ -365,7 +368,8 @@ public class Jobs extends org.apache.man
pipelineManager = new PipelineManager(threadContext,database);
cacheManager = CacheManagerFactory.make(threadContext);
-
+ lockManager = LockManagerFactory.make(threadContext);
+
outputMgr = OutputConnectionManagerFactory.make(threadContext);
connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
transMgr = TransformationConnectionManagerFactory.make(threadContext);
@@ -657,7 +661,7 @@ public class Jobs extends org.apache.man
throws ManifoldCFException
{
// Begin transaction
- beginTransaction();
+ lockManager.enterReadLock(jobsLock);
try
{
// Put together cache key
@@ -686,19 +690,9 @@ public class Jobs extends org.apache.man
}
return loadMultiple(ids,readOnlies);
}
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
finally
{
- endTransaction();
+ lockManager.leaveReadLock(jobsLock);
}
}
@@ -772,35 +766,43 @@ public class Jobs extends org.apache.man
public void delete(Long id)
throws ManifoldCFException
{
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getJobsKey());
- ssb.add(getJobStatusKey());
- ssb.add(getJobIDKey(id));
- StringSet cacheKeys = new StringSet(ssb);
- beginTransaction();
+ lockManager.enterWriteLock(jobsLock);
try
{
- scheduleManager.deleteRows(id);
- hopFilterManager.deleteRows(id);
- pipelineManager.deleteRows(id);
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(idField,id)});
- performDelete("WHERE "+query,params,cacheKeys);
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getJobsKey());
+ ssb.add(getJobStatusKey());
+ ssb.add(getJobIDKey(id));
+ StringSet cacheKeys = new StringSet(ssb);
+ beginTransaction();
+ try
+ {
+ scheduleManager.deleteRows(id);
+ hopFilterManager.deleteRows(id);
+ pipelineManager.deleteRows(id);
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(idField,id)});
+ performDelete("WHERE "+query,params,cacheKeys);
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
}
finally
{
- endTransaction();
+ lockManager.leaveWriteLock(jobsLock);
}
}
@@ -867,141 +869,149 @@ public class Jobs extends org.apache.man
public void save(IJobDescription jobDescription)
throws ManifoldCFException
{
- // The invalidation keys for this are both the general and the specific.
- Long id = jobDescription.getID();
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getJobsKey());
- ssb.add(getJobStatusKey());
- ssb.add(getJobIDKey(id));
- StringSet invKeys = new StringSet(ssb);
-
- while (true)
+ lockManager.enterWriteLock(jobsLock);
+ try
{
- long sleepAmt = 0L;
- try
+ // The invalidation keys for this are both the general and the specific.
+ Long id = jobDescription.getID();
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getJobsKey());
+ ssb.add(getJobStatusKey());
+ ssb.add(getJobIDKey(id));
+ StringSet invKeys = new StringSet(ssb);
+
+ while (true)
{
- ICacheHandle ch = cacheManager.enterCache(null,invKeys,getTransactionID());
+ long sleepAmt = 0L;
try
{
- beginTransaction();
+ ICacheHandle ch = cacheManager.enterCache(null,invKeys,getTransactionID());
try
{
- //performLock();
- // See whether the instance exists
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(idField,id)});
- IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
- query+" FOR UPDATE",params,null,null);
- HashMap values = new HashMap();
- values.put(descriptionField,jobDescription.getDescription());
- values.put(connectionNameField,jobDescription.getConnectionName());
- String newXML = jobDescription.getSpecification().toXML();
- values.put(documentSpecField,newXML);
- values.put(typeField,typeToString(jobDescription.getType()));
- values.put(startMethodField,startMethodToString(jobDescription.getStartMethod()));
- values.put(intervalField,jobDescription.getInterval());
- values.put(maxIntervalField,jobDescription.getMaxInterval());
- values.put(reseedIntervalField,jobDescription.getReseedInterval());
- values.put(expirationField,jobDescription.getExpiration());
- values.put(priorityField,new Integer(jobDescription.getPriority()));
- values.put(hopcountModeField,hopcountModeToString(jobDescription.getHopcountMode()));
-
- if (set.getRowCount() > 0)
+ beginTransaction();
+ try
{
- // Update
- // We need to reset the seedingVersionField if there are any changes that
- // could affect what set of documents we allow!!!
-
- IResultRow row = set.getRow(0);
-
- // Determine whether we need to reset the scan time for documents.
- // Basically, any change to job parameters that could affect ingestion should clear isSame so that we
- // relook at all the documents, not just the recent ones.
+ //performLock();
+ // See whether the instance exists
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(idField,id)});
+ IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
+ query+" FOR UPDATE",params,null,null);
+ HashMap values = new HashMap();
+ values.put(descriptionField,jobDescription.getDescription());
+ values.put(connectionNameField,jobDescription.getConnectionName());
+ String newXML = jobDescription.getSpecification().toXML();
+ values.put(documentSpecField,newXML);
+ values.put(typeField,typeToString(jobDescription.getType()));
+ values.put(startMethodField,startMethodToString(jobDescription.getStartMethod()));
+ values.put(intervalField,jobDescription.getInterval());
+ values.put(maxIntervalField,jobDescription.getMaxInterval());
+ values.put(reseedIntervalField,jobDescription.getReseedInterval());
+ values.put(expirationField,jobDescription.getExpiration());
+ values.put(priorityField,new Integer(jobDescription.getPriority()));
+ values.put(hopcountModeField,hopcountModeToString(jobDescription.getHopcountMode()));
- boolean isSame = pipelineManager.compareRows(id,jobDescription);
- if (!isSame)
+ if (set.getRowCount() > 0)
{
- int currentStatus = stringToStatus((String)row.getValue(statusField));
- if (currentStatus == STATUS_ACTIVE || currentStatus == STATUS_ACTIVESEEDING ||
- currentStatus == STATUS_ACTIVE_UNINSTALLED || currentStatus == STATUS_ACTIVESEEDING_UNINSTALLED)
- values.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
+ // Update
+ // We need to reset the seedingVersionField if there are any changes that
+ // could affect what set of documents we allow!!!
+
+ IResultRow row = set.getRow(0);
+
+ // Determine whether we need to reset the scan time for documents.
+ // Basically, any change to job parameters that could affect ingestion should clear isSame so that we
+ // relook at all the documents, not just the recent ones.
+
+ boolean isSame = pipelineManager.compareRows(id,jobDescription);
+ if (!isSame)
+ {
+ int currentStatus = stringToStatus((String)row.getValue(statusField));
+ if (currentStatus == STATUS_ACTIVE || currentStatus == STATUS_ACTIVESEEDING ||
+ currentStatus == STATUS_ACTIVE_UNINSTALLED || currentStatus == STATUS_ACTIVESEEDING_UNINSTALLED)
+ values.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
+ }
+
+ if (isSame)
+ {
+ String oldDocSpecXML = (String)row.getValue(documentSpecField);
+ if (!oldDocSpecXML.equals(newXML))
+ isSame = false;
+ }
+
+ if (isSame)
+ isSame = hopFilterManager.compareRows(id,jobDescription);
+
+ if (!isSame)
+ values.put(seedingVersionField,null);
+
+ params.clear();
+ query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(idField,id)});
+ performUpdate(values," WHERE "+query,params,null);
+ pipelineManager.deleteRows(id);
+ scheduleManager.deleteRows(id);
+ hopFilterManager.deleteRows(id);
}
-
- if (isSame)
+ else
{
- String oldDocSpecXML = (String)row.getValue(documentSpecField);
- if (!oldDocSpecXML.equals(newXML))
- isSame = false;
- }
-
- if (isSame)
- isSame = hopFilterManager.compareRows(id,jobDescription);
-
- if (!isSame)
+ // Insert
+ values.put(startTimeField,null);
values.put(seedingVersionField,null);
+ values.put(endTimeField,null);
+ values.put(statusField,statusToString(STATUS_INACTIVE));
+ values.put(lastTimeField,new Long(System.currentTimeMillis()));
+ values.put(idField,id);
+ performInsert(values,null);
+ }
- params.clear();
- query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(idField,id)});
- performUpdate(values," WHERE "+query,params,null);
- pipelineManager.deleteRows(id);
- scheduleManager.deleteRows(id);
- hopFilterManager.deleteRows(id);
+ // Write pipeline rows
+ pipelineManager.writeRows(id,jobDescription);
+ // Write schedule records
+ scheduleManager.writeRows(id,jobDescription);
+ // Write hop filter rows
+ hopFilterManager.writeRows(id,jobDescription);
+
+ cacheManager.invalidateKeys(ch);
+ break;
}
- else
+ catch (ManifoldCFException e)
{
- // Insert
- values.put(startTimeField,null);
- values.put(seedingVersionField,null);
- values.put(endTimeField,null);
- values.put(statusField,statusToString(STATUS_INACTIVE));
- values.put(lastTimeField,new Long(System.currentTimeMillis()));
- values.put(idField,id);
- performInsert(values,null);
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
}
-
- // Write pipeline rows
- pipelineManager.writeRows(id,jobDescription);
- // Write schedule records
- scheduleManager.writeRows(id,jobDescription);
- // Write hop filter rows
- hopFilterManager.writeRows(id,jobDescription);
-
- cacheManager.invalidateKeys(ch);
- break;
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
+ throw e;
+ sleepAmt = getSleepAmt();
+ continue;
+ }
finally
{
- cacheManager.leaveCache(ch);
+ sleepFor(sleepAmt);
}
}
- catch (ManifoldCFException e)
- {
- if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
- throw e;
- sleepAmt = getSleepAmt();
- continue;
- }
- finally
- {
- sleepFor(sleepAmt);
- }
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(jobsLock);
}
}
@@ -2989,8 +2999,7 @@ public class Jobs extends org.apache.man
public IJobDescription[] findJobsForConnection(String connectionName)
throws ManifoldCFException
{
- // Begin transaction
- beginTransaction();
+ lockManager.enterReadLock(jobsLock);
try
{
// Put together cache key
@@ -3008,28 +3017,17 @@ public class Jobs extends org.apache.man
// Convert to an array of id's, and then load them
Long[] ids = new Long[set.getRowCount()];
boolean[] readOnlies = new boolean[set.getRowCount()];
- int i = 0;
- while (i < ids.length)
+ for (int i = 0; i < ids.length; i++)
{
IResultRow row = set.getRow(i);
ids[i] = (Long)row.getValue(idField);
- readOnlies[i++] = true;
+ readOnlies[i] = true;
}
return loadMultiple(ids,readOnlies);
}
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
finally
{
- endTransaction();
+ lockManager.leaveReadLock(jobsLock);
}
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/repository/RepositoryConnectionManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/repository/RepositoryConnectionManager.java?rev=1642255&r1=1642254&r2=1642255&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/repository/RepositoryConnectionManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/repository/RepositoryConnectionManager.java Fri Nov 28 01:41:31 2014
@@ -67,14 +67,18 @@ public class RepositoryConnectionManager
protected static Random random = new Random();
// Handle for repository history manager
- protected RepositoryHistoryManager historyManager;
+ protected final RepositoryHistoryManager historyManager;
// Handle for throttle spec storage
- protected ThrottleSpecManager throttleSpecManager;
+ protected final ThrottleSpecManager throttleSpecManager;
// Cache manager
- ICacheManager cacheManager;
+ protected final ICacheManager cacheManager;
// Thread context
- IThreadContext threadContext;
+ protected final IThreadContext threadContext;
+ // Lock manager
+ protected final ILockManager lockManager;
+
+ protected final String repositoriesLock = "REPOSITORIES_LOCK";
/** Constructor.
*@param threadContext is the thread context.
@@ -87,6 +91,7 @@ public class RepositoryConnectionManager
historyManager = new RepositoryHistoryManager(threadContext,database);
throttleSpecManager = new ThrottleSpecManager(database);
cacheManager = CacheManagerFactory.make(threadContext);
+ lockManager = LockManagerFactory.make(threadContext);
this.threadContext = threadContext;
}
@@ -256,7 +261,7 @@ public class RepositoryConnectionManager
public IRepositoryConnection[] getAllConnections()
throws ManifoldCFException
{
- beginTransaction();
+ lockManager.enterReadLock(repositoriesLock);
try
{
// Read all the tools
@@ -273,19 +278,9 @@ public class RepositoryConnectionManager
}
return loadMultiple(names);
}
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
- }
finally
{
- endTransaction();
+ lockManager.leaveReadLock(repositoriesLock);
}
}
@@ -363,116 +358,124 @@ public class RepositoryConnectionManager
public boolean save(IRepositoryConnection object)
throws ManifoldCFException
{
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getRepositoryConnectionsKey());
- ssb.add(getRepositoryConnectionKey(object.getName()));
- StringSet cacheKeys = new StringSet(ssb);
- while (true)
+ lockManager.enterWriteLock(repositoriesLock);
+ try
{
- // Catch deadlock condition
- long sleepAmt = 0L;
- try
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getRepositoryConnectionsKey());
+ ssb.add(getRepositoryConnectionKey(object.getName()));
+ StringSet cacheKeys = new StringSet(ssb);
+ while (true)
{
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ // Catch deadlock condition
+ long sleepAmt = 0L;
try
{
- beginTransaction();
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- //performLock();
- // Notify of a change to the configuration
- ManifoldCF.noteConfigurationChange();
- boolean isNew = object.getIsNew();
- // See whether the instance exists
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,object.getName())});
- IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
- query+" FOR UPDATE",params,null,null);
- HashMap values = new HashMap();
- values.put(descriptionField,object.getDescription());
- values.put(classNameField,object.getClassName());
- values.put(groupNameField,object.getACLAuthority());
- values.put(maxCountField,new Long((long)object.getMaxConnections()));
- String configXML = object.getConfigParams().toXML();
- values.put(configField,configXML);
- boolean notificationNeeded = false;
- boolean isCreated;
-
- if (set.getRowCount() > 0)
+ beginTransaction();
+ try
{
- // If the object is supposedly new, it is bad that we found one that already exists.
- if (isNew)
- throw new ManifoldCFException("Repository connection '"+object.getName()+"' already exists");
- isCreated = false;
- IResultRow row = set.getRow(0);
- String oldXML = (String)row.getValue(configField);
- if (oldXML == null || !oldXML.equals(configXML))
- notificationNeeded = true;
-
- // Update
- params.clear();
- query = buildConjunctionClause(params,new ClauseDescription[]{
+ //performLock();
+ // Notify of a change to the configuration
+ ManifoldCF.noteConfigurationChange();
+ boolean isNew = object.getIsNew();
+ // See whether the instance exists
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
new UnitaryClause(nameField,object.getName())});
- performUpdate(values," WHERE "+query,params,null);
- throttleSpecManager.deleteRows(object.getName());
+ IResultSet set = performQuery("SELECT * FROM "+getTableName()+" WHERE "+
+ query+" FOR UPDATE",params,null,null);
+ HashMap values = new HashMap();
+ values.put(descriptionField,object.getDescription());
+ values.put(classNameField,object.getClassName());
+ values.put(groupNameField,object.getACLAuthority());
+ values.put(maxCountField,new Long((long)object.getMaxConnections()));
+ String configXML = object.getConfigParams().toXML();
+ values.put(configField,configXML);
+ boolean notificationNeeded = false;
+ boolean isCreated;
+
+ if (set.getRowCount() > 0)
+ {
+ // If the object is supposedly new, it is bad that we found one that already exists.
+ if (isNew)
+ throw new ManifoldCFException("Repository connection '"+object.getName()+"' already exists");
+ isCreated = false;
+ IResultRow row = set.getRow(0);
+ String oldXML = (String)row.getValue(configField);
+ if (oldXML == null || !oldXML.equals(configXML))
+ notificationNeeded = true;
+
+ // Update
+ params.clear();
+ query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,object.getName())});
+ performUpdate(values," WHERE "+query,params,null);
+ throttleSpecManager.deleteRows(object.getName());
+ }
+ else
+ {
+ // If the object is not supposed to be new, it is bad that we did not find one.
+ if (!isNew)
+ throw new ManifoldCFException("Repository connection '"+object.getName()+"' no longer exists");
+ isCreated = true;
+ // Insert
+ values.put(nameField,object.getName());
+ // We only need the general key because this is new.
+ performInsert(values,null);
+ }
+
+ // Write secondary table stuff
+ throttleSpecManager.writeRows(object.getName(),object);
+
+ // If notification required, do it.
+ if (notificationNeeded)
+ {
+ IJobManager jobManager = JobManagerFactory.make(threadContext);
+ jobManager.noteConnectionChange(object.getName());
+ }
+
+ cacheManager.invalidateKeys(ch);
+ return isCreated;
}
- else
+ catch (ManifoldCFException e)
{
- // If the object is not supposed to be new, it is bad that we did not find one.
- if (!isNew)
- throw new ManifoldCFException("Repository connection '"+object.getName()+"' no longer exists");
- isCreated = true;
- // Insert
- values.put(nameField,object.getName());
- // We only need the general key because this is new.
- performInsert(values,null);
+ signalRollback();
+ throw e;
}
-
- // Write secondary table stuff
- throttleSpecManager.writeRows(object.getName(),object);
-
- // If notification required, do it.
- if (notificationNeeded)
+ catch (Error e)
{
- IJobManager jobManager = JobManagerFactory.make(threadContext);
- jobManager.noteConnectionChange(object.getName());
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
}
-
- cacheManager.invalidateKeys(ch);
- return isCreated;
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
+ catch (ManifoldCFException e)
+ {
+ // Is this a deadlock exception? If so, we want to try again.
+ if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
+ throw e;
+ sleepAmt = getSleepAmt();
+ }
finally
{
- cacheManager.leaveCache(ch);
+ sleepFor(sleepAmt);
}
}
- catch (ManifoldCFException e)
- {
- // Is this a deadlock exception? If so, we want to try again.
- if (e.getErrorCode() != ManifoldCFException.DATABASE_TRANSACTION_ABORT)
- throw e;
- sleepAmt = getSleepAmt();
- }
- finally
- {
- sleepFor(sleepAmt);
- }
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(repositoriesLock);
}
}
@@ -485,49 +488,55 @@ public class RepositoryConnectionManager
{
// Grab a job manager handle. We will need to check if any jobs refer to this connection.
IJobManager jobManager = JobManagerFactory.make(threadContext);
-
- StringSetBuffer ssb = new StringSetBuffer();
- ssb.add(getRepositoryConnectionsKey());
- ssb.add(getRepositoryConnectionKey(name));
- StringSet cacheKeys = new StringSet(ssb);
- ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
+ lockManager.enterWriteLock(repositoriesLock);
try
{
- beginTransaction();
+ StringSetBuffer ssb = new StringSetBuffer();
+ ssb.add(getRepositoryConnectionsKey());
+ ssb.add(getRepositoryConnectionKey(name));
+ StringSet cacheKeys = new StringSet(ssb);
+ ICacheHandle ch = cacheManager.enterCache(null,cacheKeys,getTransactionID());
try
{
- // Check if any jobs refer to this connection name
- if (jobManager.checkIfReference(name))
- throw new ManifoldCFException("Can't delete repository connection '"+name+"': existing jobs refer to it");
- ManifoldCF.noteConfigurationChange();
- throttleSpecManager.deleteRows(name);
- historyManager.deleteOwner(name);
- ArrayList params = new ArrayList();
- String query = buildConjunctionClause(params,new ClauseDescription[]{
- new UnitaryClause(nameField,name)});
- performDelete("WHERE "+query,params,null);
- cacheManager.invalidateKeys(ch);
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
+ beginTransaction();
+ try
+ {
+ // Check if any jobs refer to this connection name
+ if (jobManager.checkIfReference(name))
+ throw new ManifoldCFException("Can't delete repository connection '"+name+"': existing jobs refer to it");
+ ManifoldCF.noteConfigurationChange();
+ throttleSpecManager.deleteRows(name);
+ historyManager.deleteOwner(name);
+ ArrayList params = new ArrayList();
+ String query = buildConjunctionClause(params,new ClauseDescription[]{
+ new UnitaryClause(nameField,name)});
+ performDelete("WHERE "+query,params,null);
+ cacheManager.invalidateKeys(ch);
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
}
finally
{
- endTransaction();
+ cacheManager.leaveCache(ch);
}
}
finally
{
- cacheManager.leaveCache(ch);
+ lockManager.leaveWriteLock(repositoriesLock);
}
-
}
/** Return true if the specified authority name is referenced.