You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/16 02:26:29 UTC

svn commit: r1551099 - in /manifoldcf/branches/CONNECTORS-829/framework: ./ core/src/main/java/org/apache/manifoldcf/core/interfaces/ core/src/main/java/org/apache/manifoldcf/core/throttler/ core/src/test/java/org/apache/manifoldcf/core/throttler/

Author: kwright
Date: Mon Dec 16 01:26:29 2013
New Revision: 1551099

URL: http://svn.apache.org/r1551099
Log:
Add throttler test (which doesn't work yet)

Added:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/
    manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java   (with props)
Modified:
    manifoldcf/branches/CONNECTORS-829/framework/build.xml
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/build.xml?rev=1551099&r1=1551098&r2=1551099&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/build.xml (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/build.xml Mon Dec 16 01:26:29 2013
@@ -1482,6 +1482,7 @@
             <test name="org.apache.manifoldcf.core.common.DateTest" todir="test-output"/>
             <test name="org.apache.manifoldcf.core.fuzzyml.TestFuzzyML" todir="test-output"/>
             <test name="org.apache.manifoldcf.core.lockmanager.TestZooKeeperLocks" todir="test-output"/>
+            <test name="org.apache.manifoldcf.core.throttler.TestThrottler" todir="test-output"/>
 
         </junit>
     </target>

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java?rev=1551099&r1=1551098&r2=1551099&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java Mon Dec 16 01:26:29 2013
@@ -27,7 +27,7 @@ public interface IThrottleSpec
   public static final String _rcsid = "@(#)$Id$";
 
   /** Given a bin name, find the max open connections to use for that bin.
-  *@return -1 if no limit found.
+  *@return Integer.MAX_VALUE if no limit found.
   */
   public int getMaxOpenConnections(String binName);
 

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1551099&r1=1551098&r2=1551099&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Mon Dec 16 01:26:29 2013
@@ -154,7 +154,7 @@ public class ConnectionBin
   
   public static final int CONNECTION_DESTROY = 0;
   public static final int CONNECTION_POOLEMPTY = 1;
-  public static final int CONNECTION_WITHINBOUNDS =2;
+  public static final int CONNECTION_WITHINBOUNDS = 2;
   
   /** Figure out whether we are currently over target or not for this bin, and whether a
   * connection should be pulled from the pool and destroyed.

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java?rev=1551099&r1=1551098&r2=1551099&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleGroups.java Mon Dec 16 01:26:29 2013
@@ -89,6 +89,7 @@ public class ThrottleGroups implements I
   @Override
   public IConnectionThrottler obtainConnectionThrottler(String throttleGroupType, String throttleGroup, String[] binNames)
   {
+    java.util.Arrays.sort(binNames);
     return throttler.obtainConnectionThrottler(throttleGroupType, throttleGroup, binNames);
   }
 

Added: manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java?rev=1551099&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java (added)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java Mon Dec 16 01:26:29 2013
@@ -0,0 +1,441 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import org.junit.*;
+import static org.junit.Assert.*;
+
+public class TestThrottler extends org.apache.manifoldcf.core.tests.BaseDerby
+{
+  @Test
+  public void multiThreadConnectionPoolTest()
+    throws Exception
+  {
+    // First, create the throttle group.
+    IThreadContext threadContext = ThreadContextFactory.make();
+    IThrottleGroups tg = ThrottleGroupsFactory.make(threadContext);
+    tg.createOrUpdateThrottleGroup("test","test",new ThrottleSpec());
+    
+    // We create a pretend connection pool
+    IConnectionThrottler connectionThrottler = tg.obtainConnectionThrottler("test","test",new String[]{"A","B","C"});
+    System.out.println("Connection throttler obtained");
+    
+    // How best to test this?
+    // Well, what I'm going to do is to have multiple threads active.  Each one will do perfectly sensible things
+    // while generating a log that includes timestamps for everything that happens.  At the end, the log will be
+    // analyzed for violations of throttling policy.
+    
+    EventLog eventLog = new EventLog();
+    
+    int numThreads = 10;
+    
+    TesterThread[] threads = new TesterThread[numThreads];
+    for (int i = 0; i < numThreads; i++)
+    {
+      threads[i] = new TesterThread(connectionThrottler, eventLog);
+      threads[i].start();
+    }
+    
+    // Now, join all the threads at the end
+    for (int i = 0; i < numThreads; i++)
+    {
+      threads[i].finishUp();
+    }
+    
+    // Finally, do the log analysis
+    eventLog.analyze();
+    
+    System.out.println("Done test");
+  }
+  
+  protected static class TesterThread extends Thread
+  {
+    protected final EventLog eventLog;
+    protected final IConnectionThrottler connectionThrottler;
+    protected Throwable exception = null;
+    
+    public TesterThread(IConnectionThrottler connectionThrottler, EventLog eventLog)
+    {
+      this.connectionThrottler = connectionThrottler;
+      this.eventLog = eventLog;
+    }
+    
+    public void run()
+    {
+      try
+      {
+        int numberConnectionCycles = 3;
+        int numberFetchesPerCycle = 3;
+        
+        for (int k = 0; k < numberConnectionCycles; k++)
+        {
+          // First grab a connection.
+          int rval = connectionThrottler.waitConnectionAvailable();
+          if (rval == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+            return;
+          IFetchThrottler fetchThrottler;
+          if (rval == IConnectionThrottler.CONNECTION_FROM_CREATION)
+          {
+            // Pretend to create the connection
+            eventLog.addLogEntry(new ConnectionCreatedEvent());
+          }
+          else
+          {
+            // Pretend to get it from the pool
+            eventLog.addLogEntry(new ConnectionFromPoolEvent());
+          }
+          fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
+
+          for (int l = 0; l < numberFetchesPerCycle; l++)
+          {
+            // Perform a fake fetch
+            IStreamThrottler streamThrottler = fetchThrottler.obtainFetchDocumentPermission();
+            if (streamThrottler == null)
+              return;
+            eventLog.addLogEntry(new FetchStartEvent());
+            // Do one read
+            if (streamThrottler.obtainReadPermission(1000) == false)
+              return;
+            eventLog.addLogEntry(new ReadStartEvent(1000));
+            streamThrottler.releaseReadPermission(1000, 1000);
+            eventLog.addLogEntry(new ReadDoneEvent(1000));
+            // Do another read
+            if (streamThrottler.obtainReadPermission(1000) == false)
+              return;
+            eventLog.addLogEntry(new ReadStartEvent(1000));
+            streamThrottler.releaseReadPermission(1000, 1000);
+            eventLog.addLogEntry(new ReadDoneEvent(1000));
+            // Do a third read
+            if (streamThrottler.obtainReadPermission(1000) == false)
+              return;
+            eventLog.addLogEntry(new ReadStartEvent(1000));
+            streamThrottler.releaseReadPermission(1000, 100);
+            eventLog.addLogEntry(new ReadDoneEvent(100));
+            // Close the stream
+            streamThrottler.closeStream();
+            eventLog.addLogEntry(new FetchDoneEvent());
+          }
+          
+          // Pretend to release the connection
+          boolean destroyIt = connectionThrottler.noteReturnedConnection();
+          if (destroyIt)
+          {
+            eventLog.addLogEntry(new ConnectionDestroyedEvent());
+            connectionThrottler.noteConnectionDestroyed();
+          }
+          else
+          {
+            eventLog.addLogEntry(new ConnectionReturnedToPoolEvent());
+            connectionThrottler.noteConnectionReturnedToPool();
+          }
+        }
+      }
+      catch (Exception e)
+      {
+        exception = e;
+      }
+    }
+    
+    public void finishUp()
+      throws Exception
+    {
+      join();
+      if (exception != null)
+      {
+        if (exception instanceof RuntimeException)
+          throw (RuntimeException)exception;
+        else if (exception instanceof Error)
+          throw (Error)exception;
+        else if (exception instanceof Exception)
+          throw (Exception)exception;
+        else
+          throw new RuntimeException("Unknown exception: "+exception.getClass().getName()+": "+exception.getMessage(),exception);
+      }
+    }
+    
+  }
+  
+  protected static class ThrottleSpec implements IThrottleSpec
+  {
+    public ThrottleSpec()
+    {
+    }
+    
+    /** Given a bin name, find the max open connections to use for that bin.
+    *@return -1 if no limit found.
+    */
+    @Override
+    public int getMaxOpenConnections(String binName)
+    {
+      if (binName.equals("A"))
+        return 2;
+      if (binName.equals("B"))
+        return 1;
+      return Integer.MAX_VALUE;
+    }
+
+    /** Look up minimum milliseconds per byte for a bin.
+    *@return 0.0 if no limit found.
+    */
+    @Override
+    public double getMinimumMillisecondsPerByte(String binName)
+    {
+      if (binName.equals("B"))
+        return 10.0;
+      if (binName.equals("C"))
+        return 5.0;
+      return 0.0;
+    }
+
+    /** Look up minimum milliseconds for a fetch for a bin.
+    *@return 0 if no limit found.
+    */
+    @Override
+    public long getMinimumMillisecondsPerFetch(String binName)
+    {
+      if (binName.equals("A"))
+        return 5;
+      if (binName.equals("C"))
+        return 20;
+      return 0;
+    }
+
+  }
+  
+  protected static class EventLog
+  {
+    protected final List<LogEntry> logList = new ArrayList<LogEntry>();
+    
+    public EventLog()
+    {
+    }
+    
+    public synchronized void addLogEntry(LogEntry x)
+    {
+      System.out.println(x.toString());
+      logList.add(x);
+    }
+    
+    public synchronized void analyze()
+      throws Exception
+    {
+      State s = new State();
+      for (LogEntry l : logList)
+      {
+        l.apply(s);
+      }
+      // Success!
+    }
+    
+  }
+  
+  protected static abstract class LogEntry
+  {
+    protected final long timestamp;
+    
+    public LogEntry(long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+    
+    public abstract void apply(State state)
+      throws Exception;
+    
+    public String toString()
+    {
+      return "Time: "+timestamp;
+    }
+    
+  }
+  
+  protected static class ConnectionCreatedEvent extends LogEntry
+  {
+    public ConnectionCreatedEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      // MHL
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Connection created";
+    }
+
+  }
+
+  protected static class ConnectionDestroyedEvent extends LogEntry
+  {
+    public ConnectionDestroyedEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      // MHL
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Connection destroyed";
+    }
+
+  }
+
+  protected static class ConnectionFromPoolEvent extends LogEntry
+  {
+    public ConnectionFromPoolEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      // MHL
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Connection from pool";
+    }
+
+  }
+
+  protected static class ConnectionReturnedToPoolEvent extends LogEntry
+  {
+    public ConnectionReturnedToPoolEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      // MHL
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Connection back to pool";
+    }
+
+  }
+
+  protected static class FetchStartEvent extends LogEntry
+  {
+    public FetchStartEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      // MHL
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Fetch start";
+    }
+  }
+  
+  protected static class FetchDoneEvent extends LogEntry
+  {
+    public FetchDoneEvent()
+    {
+      super(System.currentTimeMillis());
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      // MHL
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Fetch done";
+    }
+  }
+  
+  protected static class ReadStartEvent extends LogEntry
+  {
+    final int proposed;
+    
+    public ReadStartEvent(int proposed)
+    {
+      super(System.currentTimeMillis());
+      this.proposed = proposed;
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      // MHL
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Read start("+proposed+")";
+    }
+  }
+
+  protected static class ReadDoneEvent extends LogEntry
+  {
+    final int actual;
+    
+    public ReadDoneEvent(int actual)
+    {
+      super(System.currentTimeMillis());
+      this.actual = actual;
+    }
+    
+    public void apply(State state)
+      throws Exception
+    {
+      // MHL
+    }
+    
+    public String toString()
+    {
+      return super.toString() + "; Read done("+actual+")";
+    }
+  }
+
+  protected static class State
+  {
+    public int outstandingConnections = 0;
+    public long lastFetch = 0L;
+    public long lastByteRead = 0L;
+    public int lastByteAmt = 0;
+  }
+  
+}

Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/TestThrottler.java
------------------------------------------------------------------------------
    svn:keywords = Id