You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2011/07/08 09:55:37 UTC

svn commit: r1144186 - in /servicemix/smx5/trunk/core/src: main/scala/org/apache/servicemix/core/ test/scala/org/apache/servicemix/core/

Author: gnodet
Date: Fri Jul  8 07:55:37 2011
New Revision: 1144186

URL: http://svn.apache.org/viewvc?rev=1144186&view=rev
Log:
[smx5] Experiment with a camel profiling strategy

Added:
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
    servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala
      - copied, changed from r1141262, servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala
Removed:
    servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala
Modified:
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala

Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala?rev=1144186&r1=1144185&r2=1144186&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala (original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala Fri Jul  8 07:55:37 2011
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.servicemix.core
 
 import org.apache.camel.spi.InterceptStrategy

Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala?rev=1144186&r1=1144185&r2=1144186&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala (original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala Fri Jul  8 07:55:37 2011
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.servicemix.core
 
 import collection.mutable.MutableList

Added: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala?rev=1144186&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala (added)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala Fri Jul  8 07:55:37 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.apache.camel._
+import model.{RouteDefinition, ChoiceDefinition, WhenDefinition, ProcessorDefinition}
+import processor.DelegateAsyncProcessor
+import spi.{RouteContext, ProcessorFactory}
+import collection.mutable.LinkedHashMap
+
+class ProfilerStrategy extends ProcessorFactory {
+
+  val proc = new LinkedHashMap[ProcessorDefinition[_], Stats]
+
+  def createProcessor(routeContext: RouteContext, definition: ProcessorDefinition[_]) : Processor = {
+    val proc = definition.createProcessor(routeContext)
+    if (proc != null) {
+      new ProcessorWrapper(routeContext.getCamelContext, definition, proc, getStats(definition))
+    } else {
+      null
+    }
+  }
+
+  def createChildProcessor(routeContext: RouteContext, definition: ProcessorDefinition[_], mandatory: Boolean) : Processor = {
+    val proc = routeContext.createProcessor(definition)
+    if (proc != null) {
+      new ProcessorWrapper(routeContext.getCamelContext, definition, proc, getStats(definition))
+    } else {
+      null
+    }
+  }
+
+  def getStats(definition: ProcessorDefinition[_]) : Stats = {
+    if (definition == null) {
+      null
+    } else {
+      proc.getOrElseUpdate(definition, new Stats(definition, getStats(definition.getParent())))
+    }
+  }
+
+  class ProcessorWrapper(context: CamelContext, definition: ProcessorDefinition[_], target: Processor, stats: Stats) extends DelegateAsyncProcessor(target) {
+    override def process(exchange: Exchange, callback: AsyncCallback) : Boolean = {
+      val t0 = System.nanoTime()
+      try {
+        super.process(exchange, callback)
+      } finally {
+        val t1 = System.nanoTime()
+        stats.addTime(t1 - t0)
+      }
+    }
+    override def toString: String = {
+      "ProfilerWrapper[" + processor + "]"
+    }
+  }
+
+}
+
+class Stats(_definition: ProcessorDefinition[_], _parent : Stats) {
+  var count : Long = 0
+  var time : Long = 0
+  var total : Long = 0
+  def parent = _parent
+  def definition = _definition
+
+  def addTime(nanos: Long) {
+    count = count + 1
+    time = time + nanos
+    total = total + nanos
+    if (parent != null) {
+      parent.addChildTime(nanos)
+    }
+  }
+
+  def timeIncludesChildren() = {
+    definition.getClass != classOf[RouteDefinition]
+  }
+
+  def addChildTime(nanos: Long) {
+    if (timeIncludesChildren()) {
+      time = time - nanos
+    } else {
+      total = total + nanos
+      if (parent != null) {
+        parent.addChildTime(nanos)
+      }
+    }
+  }
+}
+

Copied: servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala (from r1141262, servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala)
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala?p2=servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala&p1=servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala&r1=1141262&r2=1144186&rev=1144186&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala (original)
+++ servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala Fri Jul  8 07:55:37 2011
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.servicemix.core
 
 import _root_.scala.Predef._
@@ -6,13 +22,14 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
 import org.apache.camel.scala.dsl.builder.RouteBuilder
 import org.apache.camel.impl.{DefaultProducerTemplate, DefaultCamelContext}
-import org.apache.camel.component.mock.MockEndpoint
-import org.apache.camel.spi.InterceptStrategy
 import org.apache.camel.model.ProcessorDefinition
-import org.apache.camel.{Processor, CamelContext, ProducerTemplate}
+import collection.mutable.Map
+import collection.immutable.List
+import java.util.concurrent.TimeUnit
+import org.apache.camel.{Exchange, ProducerTemplate}
 
 @RunWith(classOf[JUnitRunner])
-class InterceptionTest extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
+class ProfilerStrategyTest extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
 
   override protected def beforeAll() = {
   }
@@ -21,6 +38,10 @@ class InterceptionTest extends FunSuite 
 
   }
 
+  def sleep() = {
+    Thread.sleep(2)
+  }
+
   test("testCamel") {
 
     val context = new DefaultCamelContext();
@@ -28,9 +49,12 @@ class InterceptionTest extends FunSuite 
       "direct:a" ==> {
         to("mock:polyglot")
         choice {
-          when(_.in == "<hello/>") to ("mock:english")
+          when( (e: Exchange) => { sleep(); e.in == "<hello/>" }) {
+            to ("mock:english")
+          }
           when(_.in == "<hallo/>") {
             to("mock:dutch")
+            delay( 2 ms )
             to("mock:german")
           }
           otherwise to ("mock:french")
@@ -38,18 +62,48 @@ class InterceptionTest extends FunSuite 
       }
     });
 
-    context.addInterceptStrategy(new BreadcrumbStrategy)
-
+    val strategy = new ProfilerStrategy
+    context.setProcessorFactory(strategy)
     context.start()
 
     val template : ProducerTemplate = new DefaultProducerTemplate(context)
-    val englishEndpoint : MockEndpoint = context.getEndpoint("mock:english", classOf[MockEndpoint]);
-    englishEndpoint.expectedMessageCount(1)
 
     template.start()
-    template.sendBody("direct:a", "<hello/>")
+    val values = List("<hello/>", "<hallo/>", "<bonjour/>")
+    val rnd = new scala.util.Random
+
+    val t0 = System.nanoTime()
+    for (i <- 0 until 1000) {
+      template.sendBody("direct:a", values(rnd.nextInt(values.size)))
+    }
+    val t1 = System.nanoTime()
+    System.out.println("Total time: " + TimeUnit.MILLISECONDS.convert(t1 - t0, TimeUnit.NANOSECONDS))
+
+    print(strategy.proc)
+  }
 
-    englishEndpoint.assertIsSatisfied()
+  def print(proc: Map[ProcessorDefinition[_], Stats]) {
+    System.out.println("%-40s %8s %8s %8s".format("Processor", "Count", "Time", "Total"))
+    print(proc, null, "")
+  }
+
+  def print(proc: Map[ProcessorDefinition[_], Stats], parent: Stats, indent: String) {
+    for ((p, s) <- proc) {
+      if (s.parent == parent) {
+        var name = indent + p.toString
+        val max = 40
+        if (name.length() > max) {
+          name = name.substring(0, max - 4) + "...]"
+        } else {
+          while (name.length() < max) {
+            name = name + " "
+          }
+        }
+
+        System.out.println("%s %8d %8d %8d".format(name, s.count, TimeUnit.MILLISECONDS.convert(s.time, TimeUnit.NANOSECONDS), TimeUnit.MILLISECONDS.convert(s.total, TimeUnit.NANOSECONDS)))
+        print(proc, s, indent + "  ")
+      }
+    }
   }
 
 }
\ No newline at end of file