You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2011/07/27 14:40:49 UTC

svn commit: r1151448 - in /servicemix/smx5/trunk/core/src: main/scala/org/apache/servicemix/core/ test/scala/org/apache/servicemix/core/

Author: gnodet
Date: Wed Jul 27 12:40:47 2011
New Revision: 1151448

URL: http://svn.apache.org/viewvc?rev=1151448&view=rev
Log:
Allow enabling / disabling of some strategies globally or at the CamelContext or Route level

Added:
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ServiceMixContainer.scala
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Switchable.scala
    servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/SwitchableTest.scala
Modified:
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
    servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
    servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala

Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala?rev=1151448&r1=1151447&r2=1151448&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala (original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala Wed Jul 27 12:40:47 2011
@@ -16,11 +16,10 @@
  */
 package org.apache.servicemix.core
 
-import org.apache.camel.{AsyncCallback, Exchange, Processor, CamelContext}
 import org.apache.camel.processor.{DelegateProcessor, DelegateAsyncProcessor}
 import org.apache.camel.processor.aggregate.{AggregationStrategy, AggregateProcessor}
-import collection.mutable.HashSet
 import collection.Iterable
+import org.apache.camel._
 
 /**
  * The ServiceMix bread crumb strategy adds a header to the message to ensure we can follow the message throughout
@@ -28,11 +27,11 @@ import collection.Iterable
  */
 class Breadcrumbs extends DelegateProcessorFactory {
 
-  import Breadcrumbs.{hasBreadCrumb, addBreadCrumb, getBreadCrumb}
+  import Breadcrumbs._
 
   def create(delegate: Processor) = new DelegateAsyncProcessor(process(delegate)) {
     override def process(exchange: Exchange, callback: AsyncCallback) = {
-      if (!hasBreadCrumb(exchange)) {
+      if (isEnabled(exchange) && !hasBreadCrumb(exchange)) {
         addBreadCrumb(exchange)
       }
       processNext(exchange, callback)
@@ -50,10 +49,11 @@ class Breadcrumbs extends DelegateProces
       val strategy = new AggregationStrategy {
         def aggregate(oldExchange: Exchange, newExchange: Exchange) : Exchange = {
           val ex = oldstrat.aggregate(oldExchange, newExchange)
-          if (oldExchange == null)
-            addBreadCrumb(ex, List(getBreadCrumb(newExchange)))
-          else
-            addBreadCrumb(ex, List(getBreadCrumb(oldExchange), getBreadCrumb(newExchange)))
+          if (isEnabled(ex)) {
+            val bcs = if (oldExchange == null) getBreadCrumbs(ex) ++ getBreadCrumbs(newExchange)
+                      else getBreadCrumbs(ex) ++ getBreadCrumbs(oldExchange) ++ getBreadCrumbs(newExchange)
+            setBreadCrumbs(ex, bcs)
+          }
           ex
         }
       }
@@ -63,7 +63,7 @@ class Breadcrumbs extends DelegateProces
   }
 }
 
-object Breadcrumbs {
+object Breadcrumbs extends Switchable {
 
   /**
    * ServiceMix bread crumb header name
@@ -90,61 +90,36 @@ object Breadcrumbs {
   /**
    * Add a ServiceMix bread crumb to an Exchange
    */
-  def addBreadCrumb(exchange: Exchange) : Unit = setBreadCrumb(exchange, exchange.getContext.getUuidGenerator.generateUuid())
-
-  /**
-   * Add a number of ServiceMix bread crumbs to an Exchange
-   */
-  def addBreadCrumb(exchange: Exchange, breadcrumbs: Iterable[String]) : Unit = {
-    var bcs = new HashSet[String]()
-    bcs = bcs ++ getBreadCrumbs(exchange)
-    for (bc <- breadcrumbs) {
-      bcs = bcs ++ getBreadCrumbs(bc)
-    }
-    setBreadCrumb(exchange, bcs)
+  def addBreadCrumb(exchange: Exchange) {
+    setBreadCrumb(exchange, exchange.getContext.getUuidGenerator.generateUuid())
   }
 
   /**
    * Set the ServiceMix bread crumb to an Exchange
    */
-  def setBreadCrumb(exchange: Exchange, breadcrumb: String) : Unit = exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB, breadcrumb)
+  def setBreadCrumb(exchange: Exchange, breadcrumb: String) {
+    exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB, breadcrumb)
+  }
 
   /**
    * Set the ServiceMix bread crumbs to an Exchange
    */
-  def setBreadCrumb(exchange: Exchange, breadcrumbs: Iterable[String]) : Unit = setBreadCrumb(exchange, breadcrumbs.mkString(","))
+  def setBreadCrumbs(exchange: Exchange, breadcrumbs: Iterable[String]) {
+    setBreadCrumb(exchange, breadcrumbs.mkString(","))
+  }
 
   /**
-   * Enable bread crumbs on the target CamelContext
+   * Enable bread crumbs on the ServiceMix Container
    */
-  def enable(context: CamelContext) = {
-    context.getProcessorFactory match {
-      case global: GlobalProcessorFactory => global.addFactory(new Breadcrumbs)
-      case _ => //unable to enable bread crumbs
-    }
+  def register(container: ServiceMixContainer = ServiceMixContainer.instance) {
+    container.register(classOf[Breadcrumbs])
   }
 
   /**
-   * Disable bread crumbs on the target CamelContext
+   * Disable bread crumbs on the ServiceMix Container
    */
-  def disable(context: CamelContext) = {
-    context.getProcessorFactory match {
-      case global: GlobalProcessorFactory => for (breadcrumb <- global.factories.filter(_.isInstanceOf[Breadcrumbs])) {
-        global.removeFactory(breadcrumb)
-      }
-      case _ => //unable to enable bread crumbs
-    }
-  }
-
-  private def nullOrElse[S,T](value: S)(function: S => T) : T = if (value == null) {
-    null.asInstanceOf[T]
-  } else {
-    function(value)
-  }
-  private def nullOrElse[S,T](value: S, default: T)(function: S => T) : T = if (value == null) {
-    default
-  } else {
-    function(value)
+  def unregister(container: ServiceMixContainer = ServiceMixContainer.instance) {
+    container.unregister(classOf[Breadcrumbs])
   }
 
 }

Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala?rev=1151448&r1=1151447&r2=1151448&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala (original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala Wed Jul 27 12:40:47 2011
@@ -18,10 +18,11 @@ package org.apache.servicemix.core
 
 import org.apache.camel.spi.{RouteContext, ProcessorFactory}
 import org.apache.camel.model.ProcessorDefinition
-import collection.mutable.ListBuffer
-import org.apache.camel.processor.DelegateAsyncProcessor
-import org.apache.camel.{AsyncProcessor, Processor, AsyncCallback, Exchange}
 import java.util.concurrent.atomic.AtomicInteger
+import collection.mutable.ListBuffer
+import org.apache.camel._
+import processor.DelegateAsyncProcessor
+import GlobalProcessorFactory._
 
 /**
  * Global ServiceMix ProcessorFactory implementation, which will take care of wrapping processors with the additional
@@ -43,37 +44,44 @@ class GlobalProcessorFactory extends Pro
     nullOrElse(definition.createProcessor(context))(new GlobalDelegateProcessor(context, definition, _))
   }
 
-  def nullOrElse[S,T](value: S)(function: S => T) = if (value == null) {
-    null.asInstanceOf[T]
-  } else {
-    function(value)
-  }
-
   def triggerUpdate(block: => Unit) = {
     block
     version.incrementAndGet()
   }
 
-  def configure(original: AsyncProcessor) : AsyncProcessor = {
-    factories.foldLeft(original){ (delegate: AsyncProcessor, factory: DelegateProcessorFactory) =>
-      factory.create(delegate)
-    }
-  }
+  class GlobalDelegateProcessor(routeContext: RouteContext, definition: ProcessorDefinition[_], target: Processor) extends DelegateAsyncProcessor(target) {
 
-  class GlobalDelegateProcessor(context: RouteContext, definition: ProcessorDefinition[_], target: Processor) extends DelegateAsyncProcessor(target) {
-
-    var currentProcessor = GlobalProcessorFactory.this.configure(getProcessor())
+    var currentProcessor = configure(getProcessor)
     var version = GlobalProcessorFactory.this.version.get()
 
     override def process(exchange: Exchange, callback: AsyncCallback) = {
       // let's check if processor factories have changed and reconfigure things if necessary
       if (version < GlobalProcessorFactory.this.version.get) {
-        currentProcessor = GlobalProcessorFactory.this.configure(getProcessor())
+        currentProcessor = configure(getProcessor)
       }
 
       currentProcessor.process(exchange, callback)
     }
 
     override def toString = "ServiceMix Wrapper[" + processor + "]"
+
+    def configure(original: AsyncProcessor) : AsyncProcessor = {
+      factories.foldLeft(original) { (delegate: AsyncProcessor, factory: DelegateProcessorFactory) => {
+          factory.create(delegate)
+        }
+      }
+    }
+
   }
-}
\ No newline at end of file
+}
+
+object GlobalProcessorFactory {
+
+  private def nullOrElse[S,T](value: S)(function: S => T) = if (value == null) {
+    null.asInstanceOf[T]
+  } else {
+    function(value)
+  }
+
+}
+

Added: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ServiceMixContainer.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ServiceMixContainer.scala?rev=1151448&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ServiceMixContainer.scala (added)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ServiceMixContainer.scala Wed Jul 27 12:40:47 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.apache.camel.spi.Container
+import org.apache.camel._
+
+class ServiceMixContainer extends Container {
+
+  val processorFactory = new GlobalProcessorFactory
+
+  def manage(camelContext: CamelContext) {
+    if (camelContext.getProcessorFactory == null) {
+      camelContext.setProcessorFactory(processorFactory)
+    }
+  }
+  def register(factory: Class[_ <: DelegateProcessorFactory]) {
+    if (processorFactory.factories.filter(_.getClass == factory).isEmpty) {
+      processorFactory.addFactory(factory.newInstance())
+    }
+  }
+  def unregister(factory: Class[_ <: DelegateProcessorFactory]) {
+    for (f <- processorFactory.factories.filter(_.getClass == factory)) {
+      processorFactory.removeFactory(f)
+    }
+  }
+
+}
+
+object ServiceMixContainer {
+
+  val instance = new ServiceMixContainer
+
+  def init() {
+    Container.Instance.set(instance)
+  }
+
+}
\ No newline at end of file

Added: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Switchable.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Switchable.scala?rev=1151448&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Switchable.scala (added)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Switchable.scala Wed Jul 27 12:40:47 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import collection.mutable.HashMap
+import org.apache.camel.{Route, Exchange, CamelContext}
+
+trait Switchable {
+
+  def enable() {
+    global = true
+  }
+
+  def disable() {
+    global = false
+  }
+
+  def enable(camelContext: CamelContext) {
+    perContext += camelContext.getName -> true
+  }
+
+  def disable(camelContext: CamelContext) {
+    perContext += camelContext.getName -> false
+  }
+
+  def clear(camelContext: CamelContext) {
+    perContext -= camelContext.getName
+  }
+
+  def enable(route: Route) {
+    perRoute += route.getId -> true
+  }
+
+  def disable(route: Route) {
+    perRoute += route.getId -> false
+  }
+
+  def clear(route: Route) {
+    perRoute -= route.getId
+  }
+
+  def reset() {
+    global = true
+    perContext.clear()
+    perRoute.clear()
+  }
+
+  def isEnabled(exchange: Exchange) : Boolean = isRouteEnabled(exchange).getOrElse(isContextEnabled(exchange).getOrElse(global))
+  def isContextEnabled(exchange: Exchange): Option[Boolean] = perContext.get(exchange.getContext.getName)
+  def isRouteEnabled(exchange: Exchange): Option[Boolean] = if (exchange.getFromRouteId != null) perRoute.get(exchange.getFromRouteId) else Some(true)
+
+  private var global: Boolean = true
+  private val perContext = new HashMap[String, Boolean]
+  private val perRoute = new HashMap[String, Boolean]
+
+}
\ No newline at end of file

Modified: servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala?rev=1151448&r1=1151447&r2=1151448&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala (original)
+++ servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala Wed Jul 27 12:40:47 2011
@@ -1,5 +1,3 @@
-package org.apache.servicemix.core
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,6 +14,8 @@ package org.apache.servicemix.core
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.servicemix.core
+
 import _root_.scala.Predef._
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -37,7 +37,6 @@ class BreadcrumbsTest extends FunSuite w
 
   lazy val context = {
     val result = new DefaultCamelContext()
-    result.setProcessorFactory(new GlobalProcessorFactory)
     result.addRoutes(createRouteBuilder())
     result.start()
     result
@@ -49,14 +48,23 @@ class BreadcrumbsTest extends FunSuite w
     result
   }
 
-  override protected def afterAll() = {
+
+  override protected def beforeEach() {
+    Breadcrumbs.reset()
+  }
+
+  override protected def beforeAll() {
+    ServiceMixContainer.init()
+    Breadcrumbs.register()
+  }
+
+  override protected def afterAll() {
     template.stop()
     context.stop()
+    Breadcrumbs.unregister()
   }
 
   test("add breadcrumbs to message headers") {
-    Breadcrumbs.enable(context)
-
     for (body <- messages) {
       template.sendBody("direct:test", body)
     }
@@ -77,8 +85,6 @@ class BreadcrumbsTest extends FunSuite w
   }
 
   test("bread crumb strategy can be disabled if necessary") {
-    Breadcrumbs.enable(context)
-
     for (body <- messages) {
       template.sendBody("direct:test", body)
     }
@@ -119,8 +125,6 @@ class BreadcrumbsTest extends FunSuite w
   }
 
   test("bread crumb strategy with aggregator") {
-    Breadcrumbs.enable(context)
-
     for (body <- messages) {
       template.sendBody("direct:aggregate", body)
     }
@@ -131,11 +135,11 @@ class BreadcrumbsTest extends FunSuite w
 
     val exchange = aggres.getExchanges.get(0)
     val bcs = getBreadCrumbs(exchange)
-    assert(bcs.size == messages.size, "There should be no more bread crumbs here")
+    expect(messages.size, "The number of breadcrumbs from the aggregator is wrong")(bcs.size)
   }
 
 
-  override protected def afterEach() = {
+  override protected def afterEach() {
     MockEndpoint.resetMocks(context)
     context.getProcessorFactory.asInstanceOf[GlobalProcessorFactory].factories.clear
   }

Modified: servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala?rev=1151448&r1=1151447&r2=1151448&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala (original)
+++ servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala Wed Jul 27 12:40:47 2011
@@ -31,14 +31,14 @@ import org.apache.camel.{Exchange, Produ
 @RunWith(classOf[JUnitRunner])
 class ProfilerStrategyTest extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
 
-  override protected def beforeAll() = {
+  override protected def beforeAll() {
   }
 
-  override protected def afterAll() = {
+  override protected def afterAll() {
 
   }
 
-  def sleep() = {
+  def sleep() {
     Thread.sleep(2)
   }
 

Added: servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/SwitchableTest.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/SwitchableTest.scala?rev=1151448&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/SwitchableTest.scala (added)
+++ servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/SwitchableTest.scala Wed Jul 27 12:40:47 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
+import org.apache.camel.scala.dsl.builder.{RouteBuilder, RouteBuilderSupport}
+import org.apache.camel.impl._
+
+@RunWith(classOf[JUnitRunner])
+class SwitchableTest extends FunSuite with RouteBuilderSupport with BeforeAndAfterAll with BeforeAndAfterEach {
+
+  lazy val context = {
+    val ctx = new DefaultCamelContext()
+    ctx.setName("contextId")
+    ctx.addRoutes(new RouteBuilder() { "direct:a" --> "direct:b" routeId("routeId") })
+    ctx.start()
+    ctx
+  }
+  lazy val route = context.getRoutes.get(0)
+  lazy val exchange = {
+    val ex = new DefaultExchange(context)
+    ex.setFromRouteId(route.getId)
+    ex
+  }
+  lazy val switchable = new Switchable() {}
+
+  override def beforeEach() {
+    switchable.reset()
+  }
+
+  test("global level") {
+    switchable.reset()
+    assert(switchable.isEnabled(exchange))
+
+    switchable.disable()
+    assert(!switchable.isEnabled(exchange))
+  }
+
+  test("context level") {
+    switchable.disable()
+    switchable.enable(context)
+    assert(switchable.isEnabled(exchange))
+
+    switchable.clear(context)
+    assert(!switchable.isEnabled(exchange))
+
+    switchable.enable()
+    assert(switchable.isEnabled(exchange))
+
+    switchable.disable(context)
+    assert(!switchable.isEnabled(exchange))
+  }
+
+  test("route level") {
+    switchable.disable()
+    switchable.enable(route)
+    assert(switchable.isEnabled(exchange))
+
+    switchable.clear(route)
+    assert(!switchable.isEnabled(exchange))
+
+    switchable.enable()
+    assert(switchable.isEnabled(exchange))
+
+    switchable.disable(route)
+    assert(!switchable.isEnabled(exchange))
+
+    switchable.enable(context)
+    assert(!switchable.isEnabled(exchange))
+  }
+
+}
\ No newline at end of file