You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by nu...@apache.org on 2010/09/22 23:57:09 UTC
svn commit: r1000252 - in /commons/sandbox/pipeline/trunk: ./
src/main/java/org/apache/commons/pipeline/driver/control/
src/test/java/org/apache/commons/pipeline/driver/control/
src/test/java/org/apache/commons/pipeline/stage/
Author: nuttycom
Date: Wed Sep 22 21:57:09 2010
New Revision: 1000252
URL: http://svn.apache.org/viewvc?rev=1000252&view=rev
Log:
minor formatting improvements, changed instances of System.out.println to log.debug
Added:
commons/sandbox/pipeline/trunk/.gitignore
Modified:
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java
commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java
commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
Added: commons/sandbox/pipeline/trunk/.gitignore
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/.gitignore?rev=1000252&view=auto
==============================================================================
--- commons/sandbox/pipeline/trunk/.gitignore (added)
+++ commons/sandbox/pipeline/trunk/.gitignore Wed Sep 22 21:57:09 2010
@@ -0,0 +1,2 @@
+nbproject
+target
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java?rev=1000252&r1=1000251&r2=1000252&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java Wed Sep 22 21:57:09 2010
@@ -14,43 +14,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.commons.pipeline.driver.control;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Stage;
/**
* An implementation of DriverControlStrategy that measures stage execution times
* and increases thread counts that are taking longer than other stages on average
- *
- * @author mirror
*/
public class EqualizingDriverControlStrategy implements DriverControlStrategy {
-
+
+ Log log = LogFactory.getLog(EqualizingDriverControlStrategy.class);
+
private static class Tuple {
+
private int count = 0;
private long duration = 0;
-
- Tuple() { }
-
+
+ Tuple() {
+ }
+
public void add(long duration) {
count++;
this.duration += duration;
}
}
-
+
/** Creates a new instance of EqualizingDriverControlStrategy */
public EqualizingDriverControlStrategy() {
}
-
+
public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
- if (events.isEmpty()) return;
-
- Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>();
+ if (events.isEmpty()) {
+ return;
+ }
+
+ Map<Stage, Tuple> timings = new HashMap<Stage, Tuple>();
long total = 0;
for (StageProcessTimingEvent ev : events) {
Tuple tuple = timings.get((Stage) ev.getSource());
@@ -58,37 +63,37 @@ public class EqualizingDriverControlStra
tuple = new Tuple();
timings.put((Stage) ev.getSource(), tuple);
}
-
+
tuple.add(ev.getLatency());
total += ev.getLatency();
}
-
- //System.out.println("Events handled: " + events.size());
- System.out.print("Stage latencies: ");
- for (Map.Entry<Stage,Tuple> entry : timings.entrySet()) {
- System.out.print(entry.getKey() + ": " + entry.getValue().duration / entry.getValue().count + "; ");
- }
- System.out.println();
- //System.out.println("Total latency: " + total);
-
+
+ if (log.isDebugEnabled()) {
+ log.debug("Events handled: " + events.size());
+ log.debug("Stage latencies: ");
+ for (Map.Entry<Stage, Tuple> entry : timings.entrySet()) {
+ log.debug(entry.getKey() + ": " + entry.getValue().duration / entry.getValue().count + "; ");
+ }
+ log.debug("Total latency: " + total);
+ }
+
double mean = total / events.size();
- //System.out.println("Mean latency: " + mean);
-
+ //log.debug("Mean latency: " + mean);
+
for (PrioritizableStageDriver driver : drivers) {
Tuple tuple = timings.get(driver.getStage());
if (tuple != null) {
- long averageDuration = tuple.duration / tuple.count;
- if (averageDuration > mean + allowableDelta) {
- System.out.println("Increasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
- driver.increasePriority(1);
- } else if (averageDuration < mean - allowableDelta) {
- driver.decreasePriority(1);
- System.out.println("Decreasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
+ long averageDuration = tuple.duration / tuple.count;
+ if (averageDuration > mean + allowableDelta) {
+ log.debug("Increasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
+ driver.increasePriority(1);
+ } else if (averageDuration < mean - allowableDelta) {
+ driver.decreasePriority(1);
+ log.debug("Decreasing priority for stage " + driver.getStage() + " with average duration " + averageDuration);
}
}
}
}
-
/**
* Holds value of property allowableDelta.
*/
@@ -109,5 +114,4 @@ public class EqualizingDriverControlStra
public void setAllowableDelta(long allowableDelta) {
this.allowableDelta = allowableDelta;
}
-
}
Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java?rev=1000252&r1=1000251&r2=1000252&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java (original)
+++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java Wed Sep 22 21:57:09 2010
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.commons.pipeline.driver.control;
import java.util.HashMap;
@@ -22,6 +21,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Stage;
/**
@@ -29,10 +30,11 @@ import org.apache.commons.pipeline.Stage
* increases and decreases priorities to see if performance is improved. If
* a performance improvement is found, additional experiments are done in the
* same direction
- *
- * @author braeckel
*/
public class ExperimentalDriverControlStrategy implements DriverControlStrategy {
+
+ private Log log = LogFactory.getLog(ExperimentalDriverControlStrategy.class);
+
/**
* The minimum time difference (in percent) between different analyses of a stage for
* modifications to take place. In other words, if a stage is stable with
@@ -40,44 +42,57 @@ public class ExperimentalDriverControlSt
*/
private int minDifferencePercent = 3;
-
private enum Action {
- Decrease { void execute( PrioritizableStageDriver driver ){driver.decreasePriority( 1 ); } },
- Increase { void execute( PrioritizableStageDriver driver ){driver.increasePriority( 1 ); } },
- None { void execute( PrioritizableStageDriver driver ){ /*do nothing*/ } };
+
+ Decrease {
+
+ void execute(PrioritizableStageDriver driver) {
+ driver.decreasePriority(1);
+ }
+ },
+ Increase {
+
+ void execute(PrioritizableStageDriver driver) {
+ driver.increasePriority(1);
+ }
+ },
+ None {
+
+ void execute(PrioritizableStageDriver driver) { /*do nothing*/ }
+ };
abstract void execute(PrioritizableStageDriver driver);
}
private class Tuple {
+
private int count = 0;
private long duration = 0;
private Action lastAction = Action.None;
- Tuple() { }
+ Tuple() {
+ }
public void add(long duration) {
count++;
this.duration += duration;
}
}
-
- private Map<Stage, Tuple> lastTimings = new HashMap<Stage,Tuple>();
+ private Map<Stage, Tuple> lastTimings = new HashMap<Stage, Tuple>();
/** Creates a new instance of EqualizingDriverControlStrategy */
public ExperimentalDriverControlStrategy() {
}
- public ExperimentalDriverControlStrategy( int minDifferencePercent ){
- if( minDifferencePercent < 0 || minDifferencePercent > 100 )
- {
- throw new IllegalArgumentException( "Minimum difference percent must be between 0 and 100" );
+ public ExperimentalDriverControlStrategy(int minDifferencePercent) {
+ if (minDifferencePercent < 0 || minDifferencePercent > 100) {
+ throw new IllegalArgumentException("Minimum difference percent must be between 0 and 100");
}
this.minDifferencePercent = minDifferencePercent;
}
public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) {
- Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>();
+ Map<Stage, Tuple> timings = new HashMap<Stage, Tuple>();
for (StageProcessTimingEvent ev : events) {
Tuple tuple = timings.get(ev.getSource());
if (tuple == null) {
@@ -90,92 +105,86 @@ public class ExperimentalDriverControlSt
for (PrioritizableStageDriver driver : drivers) {
Tuple mostRecentTiming = timings.get(driver.getStage());
- Tuple previousTiming = lastTimings.get( driver.getStage() );
+ Tuple previousTiming = lastTimings.get(driver.getStage());
double avgMostRecentDuration = mostRecentTiming.duration / mostRecentTiming.count;
//first time around, try increasing priority
- if( previousTiming == null )
- {
+ if (previousTiming == null) {
mostRecentTiming.lastAction = Action.Increase;
- driver.increasePriority( 1 );
+ driver.increasePriority(1);
}
- if( previousTiming != null ){
+ if (previousTiming != null) {
double avgPreviousTiming = previousTiming.duration / previousTiming.count;
//if the performance has decreased significantly...
double timingDifference = avgPreviousTiming - avgMostRecentDuration;
- System.out.println( "Performance went from "+avgPreviousTiming + " to "+avgMostRecentDuration +"("+timingDifference+")");
+ log.debug("Performance went from " + avgPreviousTiming + " to " + avgMostRecentDuration + "(" + timingDifference + ")");
//if the timing difference was significant enough to work with...
double minDifference = avgPreviousTiming * (minDifferencePercent / 100.0);
- if( Math.abs( timingDifference ) >= minDifference )
- {
+ if (Math.abs(timingDifference) >= minDifference) {
//if the diff is positive, we have a performance improvement
- if( timingDifference > 0 )
- {
+ if (timingDifference > 0) {
//continue whatever we did last time to try and get further
//improvement
- if( previousTiming.lastAction == Action.Increase ){
- driver.increasePriority( 1 );
+ if (previousTiming.lastAction == Action.Increase) {
+ driver.increasePriority(1);
mostRecentTiming.lastAction = Action.Increase;
- }
- else if( previousTiming.lastAction == Action.Decrease ){
- driver.decreasePriority( 1 );
+ } else if (previousTiming.lastAction == Action.Decrease) {
+ driver.decreasePriority(1);
mostRecentTiming.lastAction = Action.Decrease;
- }
- //there was no last action. Try a random action
- else{
- System.out.println( "Significant performance change without a previous action: RANDOM action");
+ } //there was no last action. Try a random action
+ else {
+ log.debug("Significant performance change without a previous action: RANDOM action");
Action randomAction = getRandomAction();
mostRecentTiming.lastAction = randomAction;
- randomAction.execute( driver );
+ randomAction.execute(driver);
}
- }
- //there was a performance degradation, reverse our last step
- else
- {
+ } //there was a performance degradation, reverse our last step
+ else {
//reverse whatever we did last time to try and get further
//improvement
- if( previousTiming.lastAction == Action.Increase ){
- driver.decreasePriority( 1 );
+ if (previousTiming.lastAction == Action.Increase) {
+ driver.decreasePriority(1);
mostRecentTiming.lastAction = Action.Decrease;
- }
- else if( previousTiming.lastAction == Action.Decrease ){
- driver.increasePriority( 1 );
+ } else if (previousTiming.lastAction == Action.Decrease) {
+ driver.increasePriority(1);
mostRecentTiming.lastAction = Action.Increase;
- }
- //there was no last action. Try a random action
- else{
- System.out.println( "Significant performance change without a previous action: RANDOM action");
+ } //there was no last action. Try a random action
+ else {
+ log.debug("Significant performance change without a previous action: RANDOM action");
Action randomAction = getRandomAction();
mostRecentTiming.lastAction = randomAction;
- randomAction.execute( driver );
+ randomAction.execute(driver);
}
}
- }
- else{
+ } else {
mostRecentTiming.lastAction = Action.None;
}
}
- System.out.println( "Action="+mostRecentTiming.lastAction+", current priority="+driver.getPriority() );
+ log.debug("Action=" + mostRecentTiming.lastAction + ", current priority=" + driver.getPriority());
//take our most recent timings and roll them into the previous timings
- lastTimings.put( driver.getStage(), mostRecentTiming );
+ lastTimings.put(driver.getStage(), mostRecentTiming);
}
}
- private Action getRandomAction()
- {
+ private Action getRandomAction() {
int val = new Random().nextInt();
- if( val < 0 ) val *= -1;
+ if (val < 0) {
+ val *= -1;
+ }
int actionVal = val % 3;
- switch( actionVal ){
- case 0: return Action.None;
- case 1: return Action.Increase;
- case 2: return Action.Decrease;
- default: throw new IllegalStateException();
+ switch (actionVal) {
+ case 0:
+ return Action.None;
+ case 1:
+ return Action.Increase;
+ case 2:
+ return Action.Decrease;
+ default:
+ throw new IllegalStateException();
}
}
-
/**
* Holds value of property allowableDelta.
*/
@@ -196,5 +205,4 @@ public class ExperimentalDriverControlSt
public void setAllowableDelta(long allowableDelta) {
this.allowableDelta = allowableDelta;
}
-
}
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java?rev=1000252&r1=1000251&r2=1000252&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java Wed Sep 22 21:57:09 2010
@@ -20,20 +20,6 @@ package org.apache.commons.pipeline.driv
import junit.framework.*;
import static org.apache.commons.pipeline.StageDriver.State.*;
import static org.apache.commons.pipeline.driver.FaultTolerance.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.pipeline.Feeder;
-import org.apache.commons.pipeline.Stage;
-import org.apache.commons.pipeline.StageContext;
-import org.apache.commons.pipeline.StageDriver;
-import org.apache.commons.pipeline.StageException;
-import org.apache.commons.pipeline.driver.FaultTolerance;
public class WallClockThresholdDriverControlStrategyTest extends TestCase {
@@ -55,7 +41,7 @@ public class WallClockThresholdDriverCon
public void testCPUBoundControl() throws Exception
{
- System.out.println( "WallClock: testCPUBoundControl");
+ //System.out.println( "WallClock: testCPUBoundControl");
CountingDriverController controller = new CountingDriverController();
controller.setMinimumEventsToHandle( 10 );
controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() );
@@ -67,7 +53,7 @@ public class WallClockThresholdDriverCon
public void testIOBoundControl() throws Exception
{
- System.out.println( "WallClock: testIOBoundControl");
+ //System.out.println( "WallClock: testIOBoundControl");
CountingDriverController controller = new CountingDriverController();
controller.setMinimumEventsToHandle( 10 );
controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() );
Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java?rev=1000252&r1=1000251&r2=1000252&view=diff
==============================================================================
--- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java (original)
+++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java Wed Sep 22 21:57:09 2010
@@ -49,7 +49,7 @@ public class KeyWaitBufferStageTest exte
* data waiting for notify() to be called with an appropriate event.
*/
public void testProcessAndNotify() throws Exception {
- System.out.println("notify");
+ //System.out.println("processAndNotify");
String obj = "Hello, World!";
KeyFactory<Object,Integer> keyFactory = new KeyFactory.HashKeyFactory();