You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by nu...@apache.org on 2010/09/22 23:57:09 UTC

svn commit: r1000252 - in /commons/sandbox/pipeline/trunk: ./ src/main/java/org/apache/commons/pipeline/driver/control/ src/test/java/org/apache/commons/pipeline/driver/control/ src/test/java/org/apache/commons/pipeline/stage/

Author: nuttycom
Date: Wed Sep 22 21:57:09 2010
New Revision: 1000252

URL: http://svn.apache.org/viewvc?rev=1000252&view=rev
Log:
minor formatting improvements, changed instances of System.out.println to log.debug

Added:
    commons/sandbox/pipeline/trunk/.gitignore
Modified:
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java
    commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java
    commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java

Added: commons/sandbox/pipeline/trunk/.gitignore
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/.gitignore?rev=1000252&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/.gitignore (added)
+++ commons/sandbox/pipeline/trunk/.gitignore Wed Sep 22 21:57:09 2010
@@ -0,0 +1,2 @@
+nbproject
+target

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java?rev=1000252&r1=1000251&r2=1000252&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java Wed Sep 22 21:57:09 2010
@@ -14,43 +14,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.commons.pipeline.driver.control;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.commons.pipeline.Stage;
 
 /**
  * An implementation of DriverControlStrategy that measures stage execution times
  * and increases thread counts that are taking longer than other stages on average
- *
- * @author mirror
  */
 public class EqualizingDriverControlStrategy implements DriverControlStrategy {
-    
+
+    Log log = LogFactory.getLog(EqualizingDriverControlStrategy.class);
+
     private static class Tuple {
+
         private int count = 0;
         private long duration = 0;
-        
-        Tuple() { }
-        
+
+        Tuple() {
+        }
+
         public void add(long duration) {
             count++;
             this.duration += duration;
         }
     }
-    
+
     /** Creates a new instance of EqualizingDriverControlStrategy */
     public EqualizingDriverControlStrategy() {
     }
-    
+
     public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
-        if (events.isEmpty()) return;
-        
-        Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>();
+        if (events.isEmpty()) {
+            return;
+        }
+
+        Map<Stage, Tuple> timings = new HashMap<Stage, Tuple>();
         long total = 0;
         for (StageProcessTimingEvent ev : events) {
             Tuple tuple = timings.get((Stage) ev.getSource());
@@ -58,37 +63,37 @@ public class EqualizingDriverControlStra
                 tuple = new Tuple();
                 timings.put((Stage) ev.getSource(), tuple);
             }
-            
+
             tuple.add(ev.getLatency());
             total += ev.getLatency();
         }
-        
-        //System.out.println("Events handled: " + events.size());
-        System.out.print("Stage latencies: ");
-        for (Map.Entry<Stage,Tuple> entry : timings.entrySet()) {
-            System.out.print(entry.getKey() + ": " + entry.getValue().duration / entry.getValue().count + "; ");
-        }
-        System.out.println();
-        //System.out.println("Total latency: " + total);
-        
+
+        if (log.isDebugEnabled()) {
+            log.debug("Events handled: " + events.size());
+            log.debug("Stage latencies: ");
+            for (Map.Entry<Stage, Tuple> entry : timings.entrySet()) {
+                log.debug(entry.getKey() + ": " + entry.getValue().duration / entry.getValue().count + "; ");
+            }
+            log.debug("Total latency: " + total);
+        }
+
         double mean = total / events.size();
-        //System.out.println("Mean latency: " + mean);
-        
+        //log.debug("Mean latency: " + mean);
+
         for (PrioritizableStageDriver driver : drivers) {
             Tuple tuple = timings.get(driver.getStage());
             if (tuple != null) {
-            long averageDuration = tuple.duration / tuple.count;
-            if (averageDuration > mean + allowableDelta) {
-                    System.out.println("Increasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
-                driver.increasePriority(1);
-            } else if (averageDuration < mean - allowableDelta) {
-                driver.decreasePriority(1);
-                    System.out.println("Decreasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
+                long averageDuration = tuple.duration / tuple.count;
+                if (averageDuration > mean + allowableDelta) {
+                    log.debug("Increasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
+                    driver.increasePriority(1);
+                } else if (averageDuration < mean - allowableDelta) {
+                    driver.decreasePriority(1);
+                    log.debug("Decreasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
                 }
             }
         }
     }
-
     /**
      * Holds value of property allowableDelta.
      */
@@ -109,5 +114,4 @@ public class EqualizingDriverControlStra
     public void setAllowableDelta(long allowableDelta) {
         this.allowableDelta = allowableDelta;
     }
-    
 }

Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java?rev=1000252&r1=1000251&r2=1000252&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java Wed Sep 22 21:57:09 2010
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.commons.pipeline.driver.control;
 
 import java.util.HashMap;
@@ -22,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.commons.pipeline.Stage;
 
 /**
@@ -29,10 +30,11 @@ import org.apache.commons.pipeline.Stage
  * increases and decreases priorities to see if performance is improved.  If
  * a performance improvement is found, additional experiments are done in the
  * same direction
- *
- * @author braeckel
  */
 public class ExperimentalDriverControlStrategy implements DriverControlStrategy {
+
+    private Log log = LogFactory.getLog(ExperimentalDriverControlStrategy.class);
+    
     /**
      * The minimum time difference (in percent) between different analyses of a stage for
      * modifications to take place.  In other words, if a stage is stable with
@@ -40,44 +42,57 @@ public class ExperimentalDriverControlSt
      */
     private int minDifferencePercent = 3;
 
-
     private enum Action {
-        Decrease { void execute( PrioritizableStageDriver driver ){driver.decreasePriority( 1 ); } },
-        Increase { void execute( PrioritizableStageDriver driver ){driver.increasePriority( 1 ); } },
-        None { void execute( PrioritizableStageDriver driver ){ /*do nothing*/ } };
+
+        Decrease {
+
+            void execute(PrioritizableStageDriver driver) {
+                driver.decreasePriority(1);
+            }
+        },
+        Increase {
+
+            void execute(PrioritizableStageDriver driver) {
+                driver.increasePriority(1);
+            }
+        },
+        None {
+
+            void execute(PrioritizableStageDriver driver) { /*do nothing*/ }
+        };
 
         abstract void execute(PrioritizableStageDriver driver);
     }
 
     private class Tuple {
+
         private int count = 0;
         private long duration = 0;
         private Action lastAction = Action.None;
 
-        Tuple() { }
+        Tuple() {
+        }
 
         public void add(long duration) {
             count++;
             this.duration += duration;
         }
     }
-
-    private Map<Stage, Tuple> lastTimings = new HashMap<Stage,Tuple>();
+    private Map<Stage, Tuple> lastTimings = new HashMap<Stage, Tuple>();
 
     /** Creates a new instance of EqualizingDriverControlStrategy */
     public ExperimentalDriverControlStrategy() {
     }
 
-    public ExperimentalDriverControlStrategy( int minDifferencePercent ){
-        if( minDifferencePercent < 0 || minDifferencePercent > 100 )
-        {
-          throw new IllegalArgumentException( "Minimum difference percent must be between 0 and 100" );
+    public ExperimentalDriverControlStrategy(int minDifferencePercent) {
+        if (minDifferencePercent < 0 || minDifferencePercent > 100) {
+            throw new IllegalArgumentException("Minimum difference percent must be between 0 and 100");
         }
         this.minDifferencePercent = minDifferencePercent;
     }
 
     public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
-        Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>();
+        Map<Stage, Tuple> timings = new HashMap<Stage, Tuple>();
         for (StageProcessTimingEvent ev : events) {
             Tuple tuple = timings.get(ev.getSource());
             if (tuple == null) {
@@ -90,92 +105,86 @@ public class ExperimentalDriverControlSt
 
         for (PrioritizableStageDriver driver : drivers) {
             Tuple mostRecentTiming = timings.get(driver.getStage());
-            Tuple previousTiming = lastTimings.get( driver.getStage() );
+            Tuple previousTiming = lastTimings.get(driver.getStage());
             double avgMostRecentDuration = mostRecentTiming.duration / mostRecentTiming.count;
             //first time around, try increasing priority
-            if( previousTiming == null )
-            {
+            if (previousTiming == null) {
                 mostRecentTiming.lastAction = Action.Increase;
-                driver.increasePriority( 1 );
+                driver.increasePriority(1);
             }
 
-            if( previousTiming != null ){
+            if (previousTiming != null) {
                 double avgPreviousTiming = previousTiming.duration / previousTiming.count;
                 //if the performance has decreased significantly...
                 double timingDifference = avgPreviousTiming - avgMostRecentDuration;
 
-                System.out.println( "Performance went from "+avgPreviousTiming + " to "+avgMostRecentDuration +"("+timingDifference+")");
+                log.debug("Performance went from " + avgPreviousTiming + " to " + avgMostRecentDuration + "(" + timingDifference + ")");
                 //if the timing difference was significant enough to work with...
                 double minDifference = avgPreviousTiming * (minDifferencePercent / 100.0);
-                if( Math.abs( timingDifference ) >= minDifference )
-                {
+                if (Math.abs(timingDifference) >= minDifference) {
                     //if the diff is positive, we have a performance improvement
-                    if( timingDifference > 0 )
-                    {
+                    if (timingDifference > 0) {
                         //continue whatever we did last time to try and get further
                         //improvement
-                        if( previousTiming.lastAction == Action.Increase ){
-                            driver.increasePriority( 1 );
+                        if (previousTiming.lastAction == Action.Increase) {
+                            driver.increasePriority(1);
                             mostRecentTiming.lastAction = Action.Increase;
-                        }
-                        else if( previousTiming.lastAction == Action.Decrease ){
-                            driver.decreasePriority( 1 );
+                        } else if (previousTiming.lastAction == Action.Decrease) {
+                            driver.decreasePriority(1);
                             mostRecentTiming.lastAction = Action.Decrease;
-                        }
-                        //there was no last action.  Try a random action
-                        else{
-                            System.out.println( "Significant performance change without a previous action: RANDOM action");
+                        } //there was no last action.  Try a random action
+                        else {
+                            log.debug("Significant performance change without a previous action: RANDOM action");
                             Action randomAction = getRandomAction();
                             mostRecentTiming.lastAction = randomAction;
-                            randomAction.execute( driver );
+                            randomAction.execute(driver);
                         }
-                    }
-                    //there was a performance degradation, reverse our last step
-                    else
-                    {
+                    } //there was a performance degradation, reverse our last step
+                    else {
                         //reverse whatever we did last time to try and get further
                         //improvement
-                        if( previousTiming.lastAction == Action.Increase ){
-                            driver.decreasePriority( 1 );
+                        if (previousTiming.lastAction == Action.Increase) {
+                            driver.decreasePriority(1);
                             mostRecentTiming.lastAction = Action.Decrease;
-                        }
-                        else if( previousTiming.lastAction == Action.Decrease ){
-                            driver.increasePriority( 1 );
+                        } else if (previousTiming.lastAction == Action.Decrease) {
+                            driver.increasePriority(1);
                             mostRecentTiming.lastAction = Action.Increase;
-                        }
-                        //there was no last action.  Try a random action
-                        else{
-                            System.out.println( "Significant performance change without a previous action: RANDOM action");
+                        } //there was no last action.  Try a random action
+                        else {
+                            log.debug("Significant performance change without a previous action: RANDOM action");
                             Action randomAction = getRandomAction();
                             mostRecentTiming.lastAction = randomAction;
-                            randomAction.execute( driver );
+                            randomAction.execute(driver);
                         }
                     }
-                }
-                else{
+                } else {
                     mostRecentTiming.lastAction = Action.None;
                 }
             }
 
-            System.out.println( "Action="+mostRecentTiming.lastAction+", current priority="+driver.getPriority() );
+            log.debug("Action=" + mostRecentTiming.lastAction + ", current priority=" + driver.getPriority());
             //take our most recent timings and roll them into the previous timings
-            lastTimings.put( driver.getStage(), mostRecentTiming );
+            lastTimings.put(driver.getStage(), mostRecentTiming);
         }
     }
 
-    private Action getRandomAction()
-    {
+    private Action getRandomAction() {
         int val = new Random().nextInt();
-        if( val < 0 ) val *= -1;
+        if (val < 0) {
+            val *= -1;
+        }
         int actionVal = val % 3;
-        switch( actionVal ){
-            case 0: return Action.None;
-            case 1: return Action.Increase;
-            case 2: return Action.Decrease;
-            default: throw new IllegalStateException();
+        switch (actionVal) {
+            case 0:
+                return Action.None;
+            case 1:
+                return Action.Increase;
+            case 2:
+                return Action.Decrease;
+            default:
+                throw new IllegalStateException();
         }
     }
-
     /**
      * Holds value of property allowableDelta.
      */
@@ -196,5 +205,4 @@ public class ExperimentalDriverControlSt
     public void setAllowableDelta(long allowableDelta) {
         this.allowableDelta = allowableDelta;
     }
-
 }

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java?rev=1000252&r1=1000251&r2=1000252&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java Wed Sep 22 21:57:09 2010
@@ -20,20 +20,6 @@ package org.apache.commons.pipeline.driv
 import junit.framework.*;
 import static org.apache.commons.pipeline.StageDriver.State.*;
 import static org.apache.commons.pipeline.driver.FaultTolerance.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.Feeder;
-import org.apache.commons.pipeline.Stage;
-import org.apache.commons.pipeline.StageContext;
-import org.apache.commons.pipeline.StageDriver;
-import org.apache.commons.pipeline.StageException;
-import org.apache.commons.pipeline.driver.FaultTolerance;
 
 public class WallClockThresholdDriverControlStrategyTest extends TestCase {
     
@@ -55,7 +41,7 @@ public class WallClockThresholdDriverCon
 
     public void testCPUBoundControl() throws Exception
     {
-        System.out.println( "WallClock: testCPUBoundControl");
+        //System.out.println( "WallClock: testCPUBoundControl");
         CountingDriverController controller = new CountingDriverController();
         controller.setMinimumEventsToHandle( 10 );
         controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() );
@@ -67,7 +53,7 @@ public class WallClockThresholdDriverCon
     
     public void testIOBoundControl() throws Exception
     {
-        System.out.println( "WallClock: testIOBoundControl");
+        //System.out.println( "WallClock: testIOBoundControl");
         CountingDriverController controller = new CountingDriverController();
         controller.setMinimumEventsToHandle( 10 );
         controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() );

Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java?rev=1000252&r1=1000251&r2=1000252&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java Wed Sep 22 21:57:09 2010
@@ -49,7 +49,7 @@ public class KeyWaitBufferStageTest exte
      * data waiting for notify() to be called with an appropriate event.
      */
     public void testProcessAndNotify() throws Exception {
-        System.out.println("notify");
+        //System.out.println("processAndNotify");
         
         String obj = "Hello, World!";
         KeyFactory<Object,Integer> keyFactory = new KeyFactory.HashKeyFactory();