You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2012/11/21 00:43:23 UTC

git commit: Make RPC naming convention clear for RPC users, add handler and event dispatchers

Updated Branches:
  refs/heads/javelin 17f2af409 -> d62da2a7b


Make RPC naming convention clear for RPC users, add handler and event dispatchers


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/d62da2a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/d62da2a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/d62da2a7

Branch: refs/heads/javelin
Commit: d62da2a7b5945eda1048ec02887c7cdef511efbd
Parents: 17f2af4
Author: Kelven Yang <ke...@gmail.com>
Authored: Tue Nov 20 15:42:51 2012 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Tue Nov 20 15:42:51 2012 -0800

----------------------------------------------------------------------
 .../framework/messaging/ComponentContainer.java    |   20 ++++-
 .../framework/messaging/ComponentEndpoint.java     |   15 ++-
 .../framework/messaging/EventDispatcher.java       |   73 +++++++++++++++
 .../framework/messaging/EventHandler.java          |   34 ++++---
 .../framework/messaging/RpcCallbackDispatcher.java |   70 ++++++++++++++
 .../framework/messaging/RpcCallbackHandler.java    |   30 ++++++
 .../framework/messaging/RpcClientCall.java         |    5 +-
 .../framework/messaging/RpcClientCallHandler.java  |   12 ---
 .../framework/messaging/RpcEndpoint.java           |   23 -----
 .../framework/messaging/RpcProvider.java           |    4 +-
 .../framework/messaging/RpcServerCallHandler.java  |   30 ------
 .../framework/messaging/RpcServiceDispatcher.java  |   70 ++++++++++++++
 .../framework/messaging/RpcServiceEndpoint.java    |   30 ++++++
 .../framework/messaging/RpcServiceHandler.java     |   30 ++++++
 .../cloudstack/framework/messaging/Subscriber.java |   34 ++++---
 .../framework/messaging/TransportEndpoint.java     |    2 +-
 .../messaging/client/ClientTransportEndpoint.java  |    2 +-
 17 files changed, 375 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java
index 1d0f274..c5828ae 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java
@@ -1,5 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.cloudstack.framework.messaging;
 
 public interface ComponentContainer {
-	ComponentEndpoint wireComponent(ComponentEndpoint endpoint, String predefinedAddress);
+	ComponentEndpoint wire(ComponentEndpoint endpoint, String predefinedAddress);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java
index 92443e5..b218c19 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java
@@ -18,7 +18,8 @@
  */
 package org.apache.cloudstack.framework.messaging;
 
-public class ComponentEndpoint implements RpcEndpoint, Subscriber {
+public class ComponentEndpoint implements RpcServiceEndpoint, Subscriber {
+	
 	private TransportEndpoint transportEndpoint;
 	private RpcProvider rpcProvider;
 	
@@ -42,17 +43,19 @@ public class ComponentEndpoint implements RpcEndpoint, Subscriber {
 	}
 	
 	public void initialize() {
-		rpcProvider.registerRpcEndpoint(this);
+		rpcProvider.registerRpcServiceEndpoint(this);
 	}
 
 	@Override
-	public void onCallReceive(RpcServerCall call) {
-		// TODO Auto-generated method stub
-		// implement annotation based call dispatching
+	public boolean onCallReceive(RpcServerCall call) {
+		return RpcServiceDispatcher.dispatch(this, call);
 	}
 	
 	@Override
 	public void onPublishEvent(String subject, String senderAddress, Object args) {
-		// TODO
+		try {
+			EventDispatcher.dispatch(this, subject, senderAddress, args);
+		} catch(RuntimeException e) {
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
new file mode 100644
index 0000000..ec2afb4
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EventDispatcher {
+	private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
+	
+	public static boolean dispatch(Object target, String subject, String senderAddress, Object args) {
+		assert(subject != null);
+		assert(target != null);
+		
+		Method handler = resolveHandler(target.getClass(), subject);
+		if(handler == null)
+			return false;
+		
+		try {
+			handler.invoke(target, subject, senderAddress, args);
+		} catch (IllegalArgumentException e) {
+			throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject);
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject);
+		} catch (InvocationTargetException e) {
+			throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject);
+		}
+		
+		return true;
+	}
+	
+	public static Method resolveHandler(Class<?> handlerClz, String subject) {
+		synchronized(s_handlerCache) {
+			Method handler = s_handlerCache.get(handlerClz);
+			if(handler != null)
+				return handler;
+			
+			for(Method method : handlerClz.getMethods()) {
+				EventHandler annotation = method.getAnnotation(EventHandler.class);
+				if(annotation != null) {
+					if(match(annotation.topic(), subject)) {
+						s_handlerCache.put(handlerClz, method);
+						return method;
+					}
+				}
+			}
+		}
+		
+		return null;
+	}
+	
+	private static boolean match(String expression, String param) {
+		return param.matches(expression);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java
index 6ee67c8..5ec03f1 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java
@@ -1,19 +1,21 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// the License.  You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.cloudstack.framework.messaging;
 
 import java.lang.annotation.ElementType;

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackDispatcher.java
new file mode 100644
index 0000000..8fbe38f
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackDispatcher.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RpcCallbackDispatcher {
+
+	private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
+	
+	public static boolean dispatch(Object target, RpcClientCall clientCall) {
+		assert(clientCall != null);
+		assert(target != null);
+		
+		Method handler = resolveHandler(target.getClass(), clientCall.getCommand());
+		if(handler == null)
+			return false;
+		
+		try {
+			handler.invoke(target, clientCall);
+		} catch (IllegalArgumentException e) {
+			throw new RpcException("IllegalArgumentException when invoking RPC callback for command: " + clientCall.getCommand());
+		} catch (IllegalAccessException e) {
+			throw new RpcException("IllegalAccessException when invoking RPC callback for command: " + clientCall.getCommand());
+		} catch (InvocationTargetException e) {
+			throw new RpcException("InvocationTargetException when invoking RPC callback for command: " + clientCall.getCommand());
+		}
+		
+		return true;
+	}
+	
+	public static Method resolveHandler(Class<?> handlerClz, String command) {
+		synchronized(s_handlerCache) {
+			Method handler = s_handlerCache.get(handlerClz);
+			if(handler != null)
+				return handler;
+			
+			for(Method method : handlerClz.getMethods()) {
+				RpcCallbackHandler annotation = method.getAnnotation(RpcCallbackHandler.class);
+				if(annotation != null) {
+					if(annotation.command().equals(command)) {
+						s_handlerCache.put(handlerClz, method);
+						return method;
+					}
+				}
+			}
+		}
+		
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackHandler.java
new file mode 100644
index 0000000..61a2141
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface RpcCallbackHandler {
+    String command();
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
index 5a1e9c4..a1a4bbf 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
@@ -19,8 +19,8 @@ package org.apache.cloudstack.framework.messaging;
 import java.util.concurrent.TimeUnit;
 
 public interface RpcClientCall {
+	String getCommand();
 	RpcClientCall setCommand(String cmd);
-	RpcClientCall setPipeline(String pipeline);
 	RpcClientCall setTimeout(TimeUnit timeout);
 	
 	RpcClientCall setCommandArg(Object arg);
@@ -34,5 +34,8 @@ public interface RpcClientCall {
 	void apply();
 	void cancel();
 	
+	/**
+	 * @return the result object, it may also throw RpcException to indicate RPC failures 
+	 */
 	<T> T get();
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java
deleted file mode 100644
index d695ff3..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.cloudstack.framework.messaging;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface RpcClientCallHandler {
-    String command();
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java
deleted file mode 100644
index 375c1d3..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-public interface RpcEndpoint {
-	void onCallReceive(RpcServerCall call);
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
index 547a81a..334e09d 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
@@ -22,8 +22,8 @@ public interface RpcProvider extends TransportMultiplexier {
 	void setMessageSerializer(MessageSerializer messageSerializer);
 	MessageSerializer getMessageSerializer();
 	
-	void registerRpcEndpoint(RpcEndpoint rpcEndpoint);
-	void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint);
+	void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
+	void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
 	
 	RpcClientCall target(String target);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java
deleted file mode 100644
index 73502ab..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface RpcServerCallHandler {
-    String command();
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java
new file mode 100644
index 0000000..1f1d1b9
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RpcServiceDispatcher {
+
+	private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
+	
+	public static boolean dispatch(Object target, RpcServerCall serviceCall) {
+		assert(serviceCall != null);
+		assert(target != null);
+		
+		Method handler = resolveHandler(target.getClass(), serviceCall.getCommand());
+		if(handler == null)
+			return false;
+		
+		try {
+			handler.invoke(target, serviceCall);
+		} catch (IllegalArgumentException e) {
+			throw new RpcException("IllegalArgumentException when invoking RPC service command: " + serviceCall.getCommand());
+		} catch (IllegalAccessException e) {
+			throw new RpcException("IllegalAccessException when invoking RPC service command: " + serviceCall.getCommand());
+		} catch (InvocationTargetException e) {
+			throw new RpcException("InvocationTargetException when invoking RPC service command: " + serviceCall.getCommand());
+		}
+		
+		return true;
+	}
+	
+	public static Method resolveHandler(Class<?> handlerClz, String command) {
+		synchronized(s_handlerCache) {
+			Method handler = s_handlerCache.get(handlerClz);
+			if(handler != null)
+				return handler;
+			
+			for(Method method : handlerClz.getMethods()) {
+				RpcServiceHandler annotation = method.getAnnotation(RpcServiceHandler.class);
+				if(annotation != null) {
+					if(annotation.command().equals(command)) {
+						s_handlerCache.put(handlerClz, method);
+						return method;
+					}
+				}
+			}
+		}
+		
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceEndpoint.java
new file mode 100644
index 0000000..8820139
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceEndpoint.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+public interface RpcServiceEndpoint {
+	/*
+	 * @return
+	 * 		true call has been handled
+	 * 		false can not find the call handler
+	 * @throws
+	 *  	RpcException, exception when 
+	 */
+	boolean onCallReceive(RpcServerCall call);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceHandler.java
new file mode 100644
index 0000000..435f841
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface RpcServiceHandler {
+    String command();
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java
index 9f8d460..d4fe69c 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java
@@ -1,19 +1,21 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// the License.  You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 
 package org.apache.cloudstack.framework.messaging;
 

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
index 91ec86f..0996bfd 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
@@ -27,6 +27,6 @@ public interface TransportEndpoint {
 	void registerMultiplexier(String name, TransportMultiplexier multiplexier);
 	void unregisterMultiplexier(String name);
 	
-	void sendMessage(TransportEndpoint sender, String targetEndpointAddress, 
+	void sendMessage(String soureEndpointAddress, String targetEndpointAddress, 
 		String multiplexier, String message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/d62da2a7/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
index e12ddcf..a1c345e 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
@@ -50,7 +50,7 @@ public class ClientTransportEndpoint implements TransportEndpoint {
 	}
 
 	@Override
-	public void sendMessage(TransportEndpoint sender,
+	public void sendMessage(String sourceEndpointAddress,
 			String targetEndpointAddress, String multiplexier, String message) {
 		// TODO Auto-generated method stub
 	}