You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2011/07/08 09:55:37 UTC
svn commit: r1144186 - in /servicemix/smx5/trunk/core/src:
main/scala/org/apache/servicemix/core/ test/scala/org/apache/servicemix/core/
Author: gnodet
Date: Fri Jul 8 07:55:37 2011
New Revision: 1144186
URL: http://svn.apache.org/viewvc?rev=1144186&view=rev
Log:
[smx5] Experiment with a camel profiling strategy
Added:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala
- copied, changed from r1141262, servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala
Removed:
servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala
Modified:
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala
Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala?rev=1144186&r1=1144185&r2=1144186&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala (original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala Fri Jul 8 07:55:37 2011
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.servicemix.core
import org.apache.camel.spi.InterceptStrategy
Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala?rev=1144186&r1=1144185&r2=1144186&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala (original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala Fri Jul 8 07:55:37 2011
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.servicemix.core
import collection.mutable.MutableList
Added: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala?rev=1144186&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala (added)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala Fri Jul 8 07:55:37 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.apache.camel._
+import model.{RouteDefinition, ChoiceDefinition, WhenDefinition, ProcessorDefinition}
+import processor.DelegateAsyncProcessor
+import spi.{RouteContext, ProcessorFactory}
+import collection.mutable.LinkedHashMap
+
+class ProfilerStrategy extends ProcessorFactory {
+
+ val proc = new LinkedHashMap[ProcessorDefinition[_], Stats]
+
+ def createProcessor(routeContext: RouteContext, definition: ProcessorDefinition[_]) : Processor = {
+ val proc = definition.createProcessor(routeContext)
+ if (proc != null) {
+ new ProcessorWrapper(routeContext.getCamelContext, definition, proc, getStats(definition))
+ } else {
+ null
+ }
+ }
+
+ def createChildProcessor(routeContext: RouteContext, definition: ProcessorDefinition[_], mandatory: Boolean) : Processor = {
+ val proc = routeContext.createProcessor(definition)
+ if (proc != null) {
+ new ProcessorWrapper(routeContext.getCamelContext, definition, proc, getStats(definition))
+ } else {
+ null
+ }
+ }
+
+ def getStats(definition: ProcessorDefinition[_]) : Stats = {
+ if (definition == null) {
+ null
+ } else {
+ proc.getOrElseUpdate(definition, new Stats(definition, getStats(definition.getParent())))
+ }
+ }
+
+ class ProcessorWrapper(context: CamelContext, definition: ProcessorDefinition[_], target: Processor, stats: Stats) extends DelegateAsyncProcessor(target) {
+ override def process(exchange: Exchange, callback: AsyncCallback) : Boolean = {
+ val t0 = System.nanoTime()
+ try {
+ super.process(exchange, callback)
+ } finally {
+ val t1 = System.nanoTime()
+ stats.addTime(t1 - t0)
+ }
+ }
+ override def toString: String = {
+ "ProfilerWrapper[" + processor + "]"
+ }
+ }
+
+}
+
+class Stats(_definition: ProcessorDefinition[_], _parent : Stats) {
+ var count : Long = 0
+ var time : Long = 0
+ var total : Long = 0
+ def parent = _parent
+ def definition = _definition
+
+ def addTime(nanos: Long) {
+ count = count + 1
+ time = time + nanos
+ total = total + nanos
+ if (parent != null) {
+ parent.addChildTime(nanos)
+ }
+ }
+
+ def timeIncludesChildren() = {
+ definition.getClass != classOf[RouteDefinition]
+ }
+
+ def addChildTime(nanos: Long) {
+ if (timeIncludesChildren()) {
+ time = time - nanos
+ } else {
+ total = total + nanos
+ if (parent != null) {
+ parent.addChildTime(nanos)
+ }
+ }
+ }
+}
+
Copied: servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala (from r1141262, servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala)
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala?p2=servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala&p1=servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala&r1=1141262&r2=1144186&rev=1144186&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/InterceptionTest.scala (original)
+++ servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala Fri Jul 8 07:55:37 2011
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.servicemix.core
import _root_.scala.Predef._
@@ -6,13 +22,14 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
import org.apache.camel.scala.dsl.builder.RouteBuilder
import org.apache.camel.impl.{DefaultProducerTemplate, DefaultCamelContext}
-import org.apache.camel.component.mock.MockEndpoint
-import org.apache.camel.spi.InterceptStrategy
import org.apache.camel.model.ProcessorDefinition
-import org.apache.camel.{Processor, CamelContext, ProducerTemplate}
+import collection.mutable.Map
+import collection.immutable.List
+import java.util.concurrent.TimeUnit
+import org.apache.camel.{Exchange, ProducerTemplate}
@RunWith(classOf[JUnitRunner])
-class InterceptionTest extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
+class ProfilerStrategyTest extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
override protected def beforeAll() = {
}
@@ -21,6 +38,10 @@ class InterceptionTest extends FunSuite
}
+ def sleep() = {
+ Thread.sleep(2)
+ }
+
test("testCamel") {
val context = new DefaultCamelContext();
@@ -28,9 +49,12 @@ class InterceptionTest extends FunSuite
"direct:a" ==> {
to("mock:polyglot")
choice {
- when(_.in == "<hello/>") to ("mock:english")
+ when( (e: Exchange) => { sleep(); e.in == "<hello/>" }) {
+ to ("mock:english")
+ }
when(_.in == "<hallo/>") {
to("mock:dutch")
+ delay( 2 ms )
to("mock:german")
}
otherwise to ("mock:french")
@@ -38,18 +62,48 @@ class InterceptionTest extends FunSuite
}
});
- context.addInterceptStrategy(new BreadcrumbStrategy)
-
+ val strategy = new ProfilerStrategy
+ context.setProcessorFactory(strategy)
context.start()
val template : ProducerTemplate = new DefaultProducerTemplate(context)
- val englishEndpoint : MockEndpoint = context.getEndpoint("mock:english", classOf[MockEndpoint]);
- englishEndpoint.expectedMessageCount(1)
template.start()
- template.sendBody("direct:a", "<hello/>")
+ val values = List("<hello/>", "<hallo/>", "<bonjour/>")
+ val rnd = new scala.util.Random
+
+ val t0 = System.nanoTime()
+ for (i <- 0 until 1000) {
+ template.sendBody("direct:a", values(rnd.nextInt(values.size)))
+ }
+ val t1 = System.nanoTime()
+ System.out.println("Total time: " + TimeUnit.MILLISECONDS.convert(t1 - t0, TimeUnit.NANOSECONDS))
+
+ print(strategy.proc)
+ }
- englishEndpoint.assertIsSatisfied()
+ def print(proc: Map[ProcessorDefinition[_], Stats]) {
+ System.out.println("%-40s %8s %8s %8s".format("Processor", "Count", "Time", "Total"))
+ print(proc, null, "")
+ }
+
+ def print(proc: Map[ProcessorDefinition[_], Stats], parent: Stats, indent: String) {
+ for ((p, s) <- proc) {
+ if (s.parent == parent) {
+ var name = indent + p.toString
+ val max = 40
+ if (name.length() > max) {
+ name = name.substring(0, max - 4) + "...]"
+ } else {
+ while (name.length() < max) {
+ name = name + " "
+ }
+ }
+
+ System.out.println("%s %8d %8d %8d".format(name, s.count, TimeUnit.MILLISECONDS.convert(s.time, TimeUnit.NANOSECONDS), TimeUnit.MILLISECONDS.convert(s.total, TimeUnit.NANOSECONDS)))
+ print(proc, s, indent + " ")
+ }
+ }
}
}
\ No newline at end of file