You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/08/21 03:42:56 UTC

svn commit: r1375365 - in /manifoldcf/trunk: CHANGES.txt framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfaceMySQL.java

Author: kwright
Date: Tue Aug 21 01:42:55 2012
New Revision: 1375365

URL: http://svn.apache.org/viewvc?rev=1375365&view=rev
Log:
Fix for CONNECTORS-510.

Modified:
    manifoldcf/trunk/CHANGES.txt
    manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfaceMySQL.java

Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1375365&r1=1375364&r2=1375365&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Tue Aug 21 01:42:55 2012
@@ -3,6 +3,10 @@ $Id$
 
 ======================= 0.7-dev =====================
 
+CONNECTORS-510: MySQL now periodically runs ANALYZE on tables
+to be sure the plan(s) are reasonable.
+(Karl Wright)
+
 CONNECTORS-509: Allow JDBC Connector to choose which column
 access method to use.
 (Shigeki Kobayashi, Karl Wright)

Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfaceMySQL.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfaceMySQL.java?rev=1375365&r1=1375364&r2=1375365&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfaceMySQL.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfaceMySQL.java Tue Aug 21 01:42:55 2012
@@ -34,9 +34,29 @@ public class DBInterfaceMySQL extends Da
   
   private static final String _driver = "com.mysql.jdbc.Driver";
 
+  /** A lock manager handle. */
+  protected ILockManager lockManager;
+
   // Once we enter the serializable realm, STOP any additional transactions from doing anything at all.
   protected int serializableDepth = 0;
 
+  // This is where we keep track of tables that we need to analyze on transaction exit
+  protected List<String> tablesToAnalyze = new ArrayList<String>();
+
+  // This is where we keep temporary table statistics, which accumulate until they reach a threshold, and then are added into shared memory.
+  
+  /** Accumulated analyze statistics.  This map is keyed by the table name, and contains TableStatistics values. */
+  protected static Map<String,TableStatistics> currentAnalyzeStatistics = new HashMap<String,TableStatistics>();
+  /** Table analyze thresholds, as read from configuration information.  Keyed by table name, contains Integer values. */
+  protected static Map<String,Integer> analyzeThresholds = new HashMap<String,Integer>();
+  
+  /** The number of inserts, deletes, etc. before we update the shared area. */
+  protected static final int commitThreshold = 100;
+
+  // Lock and shared datum name prefixes (to be combined with table names)
+  protected static final String statslockAnalyzePrefix = "statslock-analyze-";
+  protected static final String statsAnalyzePrefix = "stats-analyze-";
+
   protected String cacheKey;
 
   public DBInterfaceMySQL(IThreadContext tc, String databaseName, String userName, String password)
@@ -44,6 +64,7 @@ public class DBInterfaceMySQL extends Da
   {
     super(tc,getJdbcUrl(databaseName),_driver,databaseName,userName,password);
     cacheKey = CacheKeyFactory.makeDatabaseKey(this.databaseName);
+    lockManager = LockManagerFactory.make(tc);
   }
 
   private static String getJdbcUrl(String theDatabaseName)
@@ -486,13 +507,63 @@ public class DBInterfaceMySQL extends Da
     performModification("DROP INDEX "+indexName+" ON "+tableName,null,null);
   }
 
+  /** Read a datum, presuming zero if the datum does not exist.
+  */
+  protected int readDatum(String datumName)
+    throws ManifoldCFException
+  {
+    byte[] bytes = lockManager.readData(datumName);
+    if (bytes == null)
+      return 0;
+    return (((int)bytes[0]) & 0xff) + ((((int)bytes[1]) & 0xff) << 8) + ((((int)bytes[2]) & 0xff) << 16) + ((((int)bytes[3]) & 0xff) << 24);
+  }
+
+  /** Write a datum, presuming zero if the datum does not exist.
+  */
+  protected void writeDatum(String datumName, int value)
+    throws ManifoldCFException
+  {
+    byte[] bytes = new byte[4];
+    bytes[0] = (byte)(value & 0xff);
+    bytes[1] = (byte)((value >> 8) & 0xff);
+    bytes[2] = (byte)((value >> 16) & 0xff);
+    bytes[3] = (byte)((value >> 24) & 0xff);
+    
+    lockManager.writeData(datumName,bytes);
+  }
+
   /** Analyze a table.
   *@param tableName is the name of the table to analyze/calculate statistics for.
   */
   public void analyzeTable(String tableName)
     throws ManifoldCFException
   {
-    // Does nothing
+    String tableStatisticsLock = statslockAnalyzePrefix+tableName;
+    lockManager.enterWriteCriticalSection(tableStatisticsLock);
+    try
+    {
+      TableStatistics ts = currentAnalyzeStatistics.get(tableName);
+      // Lock this table's statistics files
+      lockManager.enterWriteLock(tableStatisticsLock);
+      try
+      {
+        String eventDatum = statsAnalyzePrefix+tableName;
+        // Time to reindex this table!
+        analyzeTableInternal(tableName);
+        // Now, clear out the data
+        writeDatum(eventDatum,0);
+        if (ts != null)
+          ts.reset();
+      }
+      finally
+      {
+        lockManager.leaveWriteLock(tableStatisticsLock);
+      }
+    }
+    finally
+    {
+      lockManager.leaveWriteCriticalSection(tableStatisticsLock);
+    }
   }
 
   /** Reindex a table.
@@ -504,6 +575,15 @@ public class DBInterfaceMySQL extends Da
     // Does nothing
   }
 
+  protected void analyzeTableInternal(String tableName)
+    throws ManifoldCFException
+  {
+    if (getTransactionID() == null)
+      performModification("ANALYZE "+tableName,null,null);
+    else
+      tablesToAnalyze.add(tableName);
+  }
+
   /** Perform a table drop operation.
   *@param tableName is the name of the table to drop.
   *@param invalidateKeys are the cache keys that should be invalidated, if any.
@@ -1110,6 +1190,14 @@ public class DBInterfaceMySQL extends Da
       return;
     }
     super.endTransaction();
+    if (getTransactionID() == null)
+    {
+      for (int i = 0; i < tablesToAnalyze.size(); i++)
+      {
+        analyzeTableInternal(tablesToAnalyze.get(i));
+      }
+      tablesToAnalyze.clear();
+    }
   }
 
   /** Abstract method to roll back a transaction */
@@ -1127,5 +1215,103 @@ public class DBInterfaceMySQL extends Da
 
   }
 
+  /** Note a number of inserts, modifications, or deletions to a specific table.  This is so we can decide when to do appropriate maintenance.
+  *@param tableName is the name of the table being modified.
+  *@param insertCount is the number of inserts.
+  *@param modifyCount is the number of updates.
+  *@param deleteCount is the number of deletions.
+  */
+  @Override
+  protected void noteModificationsNoTransactions(String tableName, int insertCount, int modifyCount, int deleteCount)
+    throws ManifoldCFException
+  {
+    String tableStatisticsLock;
+    int eventCount;
+    
+    // Analysis.
+    // Here we count tuple addition.
+    eventCount = modifyCount + insertCount;
+    tableStatisticsLock = statslockAnalyzePrefix+tableName;
+    lockManager.enterWriteCriticalSection(tableStatisticsLock);
+    try
+    {
+      Integer threshold = analyzeThresholds.get(tableName);
+      int analyzeThreshold;
+      if (threshold == null)
+      {
+        // Look for this parameter; if we don't find it, use a default value.
+        analyzeThreshold = ManifoldCF.getIntProperty("org.apache.manifold.db.mysql.analyze."+tableName,10000);
+        analyzeThresholds.put(tableName,new Integer(analyzeThreshold));
+      }
+      else
+        analyzeThreshold = threshold.intValue();
+      
+      TableStatistics ts = currentAnalyzeStatistics.get(tableName);
+      if (ts == null)
+      {
+        ts = new TableStatistics();
+        currentAnalyzeStatistics.put(tableName,ts);
+      }
+      ts.add(eventCount);
+      // Check if we have passed threshold yet for this table, for committing the data to the shared area
+      if (ts.getEventCount() >= commitThreshold)
+      {
+        // Lock this table's statistics files
+        lockManager.enterWriteLock(tableStatisticsLock);
+        try
+        {
+          String eventDatum = statsAnalyzePrefix+tableName;
+          int oldEventCount = readDatum(eventDatum);
+          oldEventCount += ts.getEventCount();
+          if (oldEventCount >= analyzeThreshold)
+          {
+            // Time to reindex this table!
+            analyzeTableInternal(tableName);
+            // Now, clear out the data
+            writeDatum(eventDatum,0);
+          }
+          else
+            writeDatum(eventDatum,oldEventCount);
+          ts.reset();
+        }
+        finally
+        {
+          lockManager.leaveWriteLock(tableStatisticsLock);
+        }
+      }
+    }
+    finally
+    {
+      lockManager.leaveWriteCriticalSection(tableStatisticsLock);
+    }
+
+  }
+
+  /** Table accumulation records.
+  */
+  protected static class TableStatistics
+  {
+    protected int eventCount = 0;
+    
+    public TableStatistics()
+    {
+    }
+    
+    public void reset()
+    {
+      eventCount = 0;
+    }
+    
+    public void add(int eventCount)
+    {
+      this.eventCount += eventCount;
+    }
+    
+    public int getEventCount()
+    {
+      return eventCount;
+    }
+  }
+
 }