You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2012/12/14 04:32:01 UTC

[3/4] Refactor and finalize framework IPC java package structure

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java
deleted file mode 100644
index ff02cb8..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging.server;
-
-import org.apache.cloudstack.framework.messaging.EventBusBase;
-import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
-
-public class ServerEventBus extends EventBusBase implements TransportMultiplexier {
-
-	@Override
-	public void onTransportMessage(String senderEndpointAddress,
-			String targetEndpointAddress, String multiplexer, String message) {
-		// TODO Auto-generated method stub
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
deleted file mode 100644
index 98177d6..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging.server;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.cloudstack.framework.messaging.MessageSerializer;
-import org.apache.cloudstack.framework.messaging.TransportAddress;
-import org.apache.cloudstack.framework.messaging.TransportDataPdu;
-import org.apache.cloudstack.framework.messaging.TransportEndpoint;
-import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
-import org.apache.cloudstack.framework.messaging.TransportPdu;
-import org.apache.cloudstack.framework.messaging.TransportProvider;
-import org.apache.log4j.Logger;
-
-import com.cloud.utils.concurrency.NamedThreadFactory;
-
-public class ServerTransportProvider implements TransportProvider {
-    private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class);
-	
-	public static final int DEFAULT_WORKER_POOL_SIZE = 5;
-	
-	private String _nodeId;
-
-	private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>();
-	private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
-	private ExecutorService _executor;
-	
-	private int _nextEndpointId = new Random().nextInt();
-
-	private MessageSerializer _messageSerializer;
-	
-	public ServerTransportProvider() {
-	}
-	
-	public String getNodeId() { 
-		return _nodeId; 
-	}
-	
-	public ServerTransportProvider setNodeId(String nodeId) {
-		_nodeId = nodeId;
-		return this;
-	}
-	
-	public int getWorkerPoolSize() {
-		return _poolSize; 
-	}
-	
-	public ServerTransportProvider setWorkerPoolSize(int poolSize) {
-		assert(poolSize > 0);
-		
-		_poolSize = poolSize;
-		return this;
-	}
-	
-	@Override
-	public void setMessageSerializer(MessageSerializer messageSerializer) {
-		assert(messageSerializer != null);
-		_messageSerializer = messageSerializer;
-	}
-
-	@Override
-	public MessageSerializer getMessageSerializer() {
-		return _messageSerializer;
-	}
-	
-	public void initialize() {
-		_executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
-	}
-	
-	@Override
-	public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
-		
-		TransportAddress transportAddress;
-		String endpointId;
-		if(predefinedAddress != null && !predefinedAddress.isEmpty()) {
-			endpointId = predefinedAddress;
-			transportAddress = new TransportAddress(_nodeId, endpointId, 0);
-		} else {
-			endpointId = String.valueOf(getNextEndpointId());
-			transportAddress = new TransportAddress(_nodeId, endpointId);
-		}
-		
-		TransportEndpointSite endpointSite;
-		synchronized(this) {
-			endpointSite = _endpointMap.get(endpointId);
-			if(endpointSite != null) {
-				// already attached
-				return endpointSite;
-			}
-			endpointSite = new TransportEndpointSite(this, endpoint, transportAddress);
-			_endpointMap.put(endpointId, endpointSite);
-		}
-		
-		endpoint.onAttachConfirm(true, transportAddress.toString());
-		return endpointSite;
-	}
-
-	@Override
-	public boolean detach(TransportEndpoint endpoint) {
-		synchronized(this) {
-			for(Map.Entry<String, TransportEndpointSite> entry : _endpointMap.entrySet()) {
-				if(entry.getValue().getEndpoint() == endpoint) {
-					_endpointMap.remove(entry.getKey());
-					return true;
-				}
-			}
-		}
-			
-		return false;
-	}
-	
-	@Override
-	public void requestSiteOutput(final TransportEndpointSite site) {
-		_executor.execute(new Runnable() {
-
-			@Override
-			public void run() {
-				try {
-					site.processOutput();
-					site.ackOutputProcessSignal();
-				} catch(Throwable e) {
-					s_logger.error("Unhandled exception", e);
-				}
-			}
-		});
-	}
-	
-	@Override
-	public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress, 
-		String multiplexier, String message) {
-		
-		TransportDataPdu pdu = new TransportDataPdu();
-		pdu.setSourceAddress(sourceEndpointAddress);
-		pdu.setDestAddress(targetEndpointAddress);
-		pdu.setMultiplexier(multiplexier);
-		pdu.setContent(message);
-		
-		dispatchPdu(pdu);
-	}
-	
-	private void dispatchPdu(TransportPdu pdu) {
-		
-		TransportAddress transportAddress = TransportAddress.fromAddressString(pdu.getDestAddress());
-		
-		if(isLocalAddress(transportAddress)) {
-			TransportEndpointSite endpointSite = null;
-			synchronized(this) {
-				endpointSite = _endpointMap.get(transportAddress.getEndpointId());
-			}
-			
-			if(endpointSite != null)
-				endpointSite.addOutputPdu(pdu);
-		} else {
-			// do cross-node forwarding
-			// ???
-		}
-	}
-	
-	private boolean isLocalAddress(TransportAddress address) {
-		if(address.getNodeId().equals(_nodeId) || address.getNodeId().equals(TransportAddress.LOCAL_SERVICE_NODE))
-			return true;
-		
-		return false;
-	}
-	
-	private synchronized int getNextEndpointId() {
-		return _nextEndpointId++;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallRequestPdu.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallRequestPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallRequestPdu.java
new file mode 100644
index 0000000..b85316e
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallRequestPdu.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import org.apache.cloudstack.framework.serializer.OnwireName;
+
+@OnwireName(name="RpcRequest")
+public class RpcCallRequestPdu {
+	
+	private long requestTag;
+	private long requestStartTick;
+	
+	private String command;
+	private String serializedCommandArg;
+
+	public RpcCallRequestPdu() {
+		requestTag = 0;
+		requestStartTick = System.currentTimeMillis();
+	}
+	
+	public long getRequestTag() {
+		return requestTag;
+	}
+	
+	public void setRequestTag(long requestTag) {
+		this.requestTag = requestTag;
+	}
+	
+	public long getRequestStartTick() {
+		return requestStartTick;
+	}
+	
+	public void setRequestStartTick(long requestStartTick) {
+		this.requestStartTick = requestStartTick;
+	}
+	
+	public String getCommand() {
+		return command;
+	}
+	
+	public void setCommand(String command) {
+		this.command = command;
+	}
+	
+	public String getSerializedCommandArg() {
+		return serializedCommandArg;
+	}
+	
+	public void setSerializedCommandArg(String serializedCommandArg) {
+		this.serializedCommandArg = serializedCommandArg;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallResponsePdu.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallResponsePdu.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallResponsePdu.java
new file mode 100644
index 0000000..f6cd0a0
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallResponsePdu.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import org.apache.cloudstack.framework.serializer.OnwireName;
+
+@OnwireName(name="RpcResponse")
+public class RpcCallResponsePdu {
+	public static final int RESULT_SUCCESSFUL = 0;
+	public static final int RESULT_HANDLER_NOT_EXIST = 1;
+	public static final int RESULT_HANDLER_EXCEPTION = 2;
+	
+	private long requestTag;
+	private long requestStartTick;
+	
+	private int result;
+	private String command;
+	private String serializedResult;
+	
+	public RpcCallResponsePdu() {
+		requestTag = 0;
+		requestStartTick = 0;
+	}
+
+	public long getRequestTag() {
+		return requestTag;
+	}
+	
+	public void setRequestTag(long requestTag) {
+		this.requestTag = requestTag;
+	}
+	
+	public long getRequestStartTick() {
+		return requestStartTick;
+	}
+	
+	public void setRequestStartTick(long requestStartTick) {
+		this.requestStartTick = requestStartTick;
+	}
+	
+	public int getResult() {
+		return result;
+	}
+	
+	public void setResult(int result) {
+		this.result = result;
+	}
+	
+	public String getCommand() {
+		return command;
+	}
+	
+	public void setCommand(String command) {
+		this.command = command;
+	}
+	
+	public String getSerializedResult() {
+		return serializedResult;
+	}
+	
+	public void setSerializedResult(String serializedResult) {
+		this.serializedResult = serializedResult;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackDispatcher.java
new file mode 100644
index 0000000..c787e6a
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackDispatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class RpcCallbackDispatcher {
+
+	private static Map<Class<?>, Map<String, Method>> s_handlerCache = new HashMap<Class<?>, Map<String, Method>>();
+	
+	public static boolean dispatch(Object target, RpcClientCall clientCall) {
+		assert(clientCall != null);
+		assert(target != null);
+		
+		Method handler = resolveHandler(target.getClass(), clientCall.getCommand());
+		if(handler == null)
+			return false;
+		
+		try {
+			handler.invoke(target, clientCall);
+		} catch (IllegalArgumentException e) {
+			throw new RpcException("IllegalArgumentException when invoking RPC callback for command: " + clientCall.getCommand());
+		} catch (IllegalAccessException e) {
+			throw new RpcException("IllegalAccessException when invoking RPC callback for command: " + clientCall.getCommand());
+		} catch (InvocationTargetException e) {
+			throw new RpcException("InvocationTargetException when invoking RPC callback for command: " + clientCall.getCommand());
+		}
+		
+		return true;
+	}
+	
+	public static Method resolveHandler(Class<?> handlerClz, String command) {
+		synchronized(s_handlerCache) {
+			Map<String, Method> handlerMap = getAndSetHandlerMap(handlerClz);
+				
+			Method handler = handlerMap.get(command);
+			if(handler != null)
+				return handler;
+			
+			for(Method method : handlerClz.getDeclaredMethods()) {
+				RpcCallbackHandler annotation = method.getAnnotation(RpcCallbackHandler.class);
+				if(annotation != null) {
+					if(annotation.command().equals(command)) {
+						method.setAccessible(true);
+						handlerMap.put(command, method);
+						return method;
+					}
+				}
+			}
+		}
+		
+		return null;
+	}
+	
+	private static Map<String, Method> getAndSetHandlerMap(Class<?> handlerClz) {
+		Map<String, Method> handlerMap;
+		synchronized(s_handlerCache) {
+			handlerMap = s_handlerCache.get(handlerClz);
+			
+			if(handlerMap == null) {
+				handlerMap = new HashMap<String, Method>();
+				s_handlerCache.put(handlerClz, handlerMap);
+			}
+		}
+		
+		return handlerMap;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackHandler.java
new file mode 100644
index 0000000..86dfceb
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface RpcCallbackHandler {
+    String command();
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackListener.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackListener.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackListener.java
new file mode 100644
index 0000000..0ab94ac
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public interface RpcCallbackListener<T> {
+	void onSuccess(T result);
+	void onFailure(RpcException e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCall.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCall.java
new file mode 100644
index 0000000..7a7e45c
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCall.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public interface RpcClientCall {
+	final static int DEFAULT_RPC_TIMEOUT = 10000;
+	
+	String getCommand();
+	RpcClientCall setCommand(String cmd);
+	RpcClientCall setTimeout(int timeoutMilliseconds);
+	
+	RpcClientCall setCommandArg(Object arg);
+	Object getCommandArg();
+	
+	RpcClientCall setContextParam(String key, Object param);
+	<T> T getContextParam(String key);
+	
+	<T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener);
+	RpcClientCall setCallbackDispatcherTarget(Object target);
+	
+	RpcClientCall setOneway();
+	
+	RpcClientCall apply();
+	void cancel();
+	
+	/**
+	 * @return the result object, it may also throw RpcException to indicate RPC failures 
+	 */
+	<T> T get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCallImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCallImpl.java
new file mode 100644
index 0000000..1db878d
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCallImpl.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class RpcClientCallImpl implements RpcClientCall {
+
+	private String _command;
+	private Object _commandArg;
+	
+	private int _timeoutMilliseconds = DEFAULT_RPC_TIMEOUT;
+	private Map<String, Object> _contextParams = new HashMap<String, Object>();
+	private boolean _oneway = false;
+	
+	@SuppressWarnings("rawtypes")
+	private List<RpcCallbackListener> _callbackListeners = new ArrayList<RpcCallbackListener>();
+	private Object _callbackDispatcherTarget;
+	
+	private RpcProvider _rpcProvider;
+	private long _startTickInMs;
+	private long _callTag;
+	private String _sourceAddress;
+	private String _targetAddress;
+	
+	private Object _responseLock = new Object();
+	private boolean _responseDone = false;;
+	private Object _responseResult;
+		
+	public RpcClientCallImpl(RpcProvider rpcProvider) {
+		assert(rpcProvider != null);
+		_rpcProvider = rpcProvider;
+	}
+	
+	@Override
+	public String getCommand() {
+		return _command;
+	}
+
+	@Override
+	public RpcClientCall setCommand(String cmd) {
+		_command = cmd;
+		return this;
+	}
+
+	@Override
+	public RpcClientCall setTimeout(int timeoutMilliseconds) {
+		_timeoutMilliseconds = timeoutMilliseconds;
+		return this;
+	}
+
+	@Override
+	public RpcClientCall setCommandArg(Object arg) {
+		_commandArg = arg;
+		return this;
+	}
+
+	@Override
+	public Object getCommandArg() {
+		return _commandArg;
+	}
+
+	@Override
+	public RpcClientCall setContextParam(String key, Object param) {
+		assert(key != null);
+		_contextParams.put(key, param);
+		return this;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <T> T getContextParam(String key) {
+		return (T)_contextParams.get(key);
+	}
+
+	@Override
+	public <T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener) {
+		assert(listener != null);
+		_callbackListeners.add(listener);
+		return this;
+	}
+	
+	@Override
+	public RpcClientCall setCallbackDispatcherTarget(Object target) {
+		_callbackDispatcherTarget = target;
+		return this;
+	}
+	
+
+	@Override
+	public RpcClientCall setOneway() {
+		_oneway = true;
+		return this;
+	}
+	
+	public String getSourceAddress() {
+		return _sourceAddress;
+	}
+	
+	public void setSourceAddress(String sourceAddress) {
+		_sourceAddress = sourceAddress;
+	}
+	
+	public String getTargetAddress() {
+		return _targetAddress;
+	}
+	
+	public void setTargetAddress(String targetAddress) {
+		_targetAddress = targetAddress;
+	}
+	
+	public long getCallTag() {
+		return _callTag;
+	}
+	
+	public void setCallTag(long callTag) {
+		_callTag = callTag;
+	}
+
+	@Override
+	public RpcClientCall apply() {
+		// sanity check
+		assert(_sourceAddress != null);
+		assert(_targetAddress != null);
+		
+		if(!_oneway)
+			_rpcProvider.registerCall(this);
+		
+		RpcCallRequestPdu pdu = new RpcCallRequestPdu();
+		pdu.setCommand(getCommand());
+		if(_commandArg != null)
+			pdu.setSerializedCommandArg(_rpcProvider.getMessageSerializer().serializeTo(_commandArg.getClass(), _commandArg));
+		pdu.setRequestTag(this.getCallTag());
+		
+		_rpcProvider.sendRpcPdu(getSourceAddress(), getTargetAddress(), 
+			_rpcProvider.getMessageSerializer().serializeTo(RpcCallRequestPdu.class, pdu));
+		
+		return this;
+	}
+
+	@Override
+	public void cancel() {
+		_rpcProvider.cancelCall(this);
+	}
+
+	@Override
+	public <T> T get() {
+		if(!_oneway) {
+			synchronized(_responseLock) {
+				if(!_responseDone) {
+					long timeToWait = _timeoutMilliseconds - (System.currentTimeMillis() - _startTickInMs);
+					if(timeToWait < 0)
+						timeToWait = 0;
+					
+					try {
+						_responseLock.wait(timeToWait);
+					} catch (InterruptedException e) {
+						throw new RpcTimeoutException("RPC call timed out");
+					}
+				}
+				
+				assert(_responseDone);
+				
+				if(_responseResult == null)
+					return null;
+				
+				if(_responseResult instanceof RpcException)
+					throw (RpcException)_responseResult;
+				
+				assert(_rpcProvider.getMessageSerializer() != null);
+				assert(_responseResult instanceof String);
+				return _rpcProvider.getMessageSerializer().serializeFrom((String)_responseResult);
+			}
+		}
+		return null;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public void complete(String result) {
+		_responseResult = result;
+		
+		synchronized(_responseLock) {
+			_responseDone = true;
+			_responseLock.notifyAll();
+		}
+	
+		if(_callbackListeners.size() > 0) {
+			assert(_rpcProvider.getMessageSerializer() != null);
+			Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
+			for(@SuppressWarnings("rawtypes") RpcCallbackListener listener: _callbackListeners)
+				listener.onSuccess(resultObject);
+		} else {
+			if(_callbackDispatcherTarget != null)
+				RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
+		}
+	}
+	
+	public void complete(RpcException e) {
+		_responseResult = e;
+		
+		synchronized(_responseLock) {
+			_responseDone = true;
+			
+			_responseLock.notifyAll();
+		}
+		
+		if(_callbackListeners.size() > 0) {
+			for(@SuppressWarnings("rawtypes") RpcCallbackListener listener: _callbackListeners)
+				listener.onFailure(e);
+		} else {
+			if(_callbackDispatcherTarget != null)
+				RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcException.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcException.java
new file mode 100644
index 0000000..618e6ab
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+public class RpcException extends RuntimeException {
+	private static final long serialVersionUID = -3164514701087423787L;
+
+	public RpcException() {
+		super();
+	}
+	
+	public RpcException(String message) {
+		super(message);
+	}
+	
+	public RpcException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcIOException.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcIOException.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcIOException.java
new file mode 100644
index 0000000..8479e38
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcIOException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public class RpcIOException extends RpcException {
+
+	private static final long serialVersionUID = -6108039302920641533L;
+	
+	public RpcIOException() {
+		super();
+	}
+	
+	public RpcIOException(String message) {
+		super(message);
+	}
+	
+	public RpcIOException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProvider.java
new file mode 100644
index 0000000..fb4f04b
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+import org.apache.cloudstack.framework.transport.TransportAddressMapper;
+import org.apache.cloudstack.framework.transport.TransportMultiplexier;
+
+public interface RpcProvider extends TransportMultiplexier {
+	final static String RPC_MULTIPLEXIER = "rpc";
+	
+	void setMessageSerializer(MessageSerializer messageSerializer);
+	MessageSerializer getMessageSerializer();
+	boolean initialize();
+	
+	void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
+	void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
+
+	RpcClientCall newCall();
+	RpcClientCall newCall(String targetAddress);
+	RpcClientCall newCall(TransportAddressMapper targetAddress);
+	
+	//
+	// low-level public API
+	//
+	void registerCall(RpcClientCall call);
+	void cancelCall(RpcClientCall call);
+	
+	void sendRpcPdu(String sourceAddress, String targetAddress, String serializedPdu);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java
new file mode 100644
index 0000000..7f73e60
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+import org.apache.cloudstack.framework.transport.TransportAddress;
+import org.apache.cloudstack.framework.transport.TransportAddressMapper;
+import org.apache.cloudstack.framework.transport.TransportEndpoint;
+import org.apache.cloudstack.framework.transport.TransportEndpointSite;
+import org.apache.cloudstack.framework.transport.TransportProvider;
+
+public class RpcProviderImpl implements RpcProvider {
+	public static final String RPC_MULTIPLEXIER = "rpc";
+	
+	private TransportProvider _transportProvider;
+	private String _transportAddress;
+	private RpcTransportEndpoint _transportEndpoint = new RpcTransportEndpoint();	// transport attachment at RPC layer
+	
+	private MessageSerializer _messageSerializer;
+	private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>();
+	private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>();
+	
+	private long _nextCallTag = System.currentTimeMillis(); 
+	
+	public RpcProviderImpl() {
+	}
+	
+	public RpcProviderImpl(TransportProvider transportProvider) {
+		_transportProvider = transportProvider;
+	}
+	
+	public TransportProvider getTransportProvider() {
+		return _transportProvider;
+	}
+	
+	public void setTransportProvider(TransportProvider transportProvider) {
+		_transportProvider = transportProvider;
+	}
+	
+	@Override
+	public void onTransportMessage(String senderEndpointAddress,
+		String targetEndpointAddress, String multiplexer, String message) {
+		assert(_messageSerializer != null);
+		
+		Object pdu = _messageSerializer.serializeFrom(message);
+		if(pdu instanceof RpcCallRequestPdu) {
+			handleCallRequestPdu(senderEndpointAddress, targetEndpointAddress, (RpcCallRequestPdu)pdu);
+		} else if(pdu instanceof RpcCallResponsePdu) {
+			handleCallResponsePdu(senderEndpointAddress, targetEndpointAddress, (RpcCallResponsePdu)pdu);
+		} else {
+			assert(false);
+		}
+	}
+
+	@Override
+	public void setMessageSerializer(MessageSerializer messageSerializer) {
+		assert(messageSerializer != null);
+		_messageSerializer = messageSerializer;
+	}
+
+	@Override
+	public MessageSerializer getMessageSerializer() {
+		return _messageSerializer;
+	}
+	
+	@Override
+	public boolean initialize() {
+		assert(_transportProvider != null);
+		if(_transportProvider == null)
+			return false;
+		
+		TransportEndpointSite endpointSite = _transportProvider.attach(_transportEndpoint, "RpcProvider");
+		endpointSite.registerMultiplexier(RPC_MULTIPLEXIER, this);
+		return true;
+	}
+
+	@Override
+	public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
+		synchronized(_serviceEndpoints) {
+			_serviceEndpoints.add(rpcEndpoint);
+		}
+	}
+
+	@Override
+	public void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
+		synchronized(_serviceEndpoints) {
+			_serviceEndpoints.remove(rpcEndpoint);
+		}
+	}
+	
+	@Override
+	public RpcClientCall newCall() {
+		return newCall(TransportAddress.getLocalPredefinedTransportAddress("RpcProvider").toString());
+	}
+	
+	@Override
+	public RpcClientCall newCall(String targetAddress) {
+
+		long callTag = getNextCallTag();
+		RpcClientCallImpl call = new RpcClientCallImpl(this);
+		call.setSourceAddress(_transportAddress);
+		call.setTargetAddress(targetAddress);
+		call.setCallTag(callTag);
+		
+		return call;
+	}
+	
+	@Override
+	public RpcClientCall newCall(TransportAddressMapper targetAddress) {
+		return newCall(targetAddress.getAddress());
+	}
+	
+	@Override
+	public void registerCall(RpcClientCall call) {
+		assert(call != null);
+		synchronized(this) {
+			_outstandingCalls.put(((RpcClientCallImpl)call).getCallTag(), call);
+		}
+	}
+	
+	@Override
+	public void cancelCall(RpcClientCall call) {
+		synchronized(this) {
+			_outstandingCalls.remove(((RpcClientCallImpl)call).getCallTag());
+		}
+		
+		((RpcClientCallImpl)call).complete(new RpcException("Call is cancelled"));
+	}
+	
+	@Override
+	public void sendRpcPdu(String sourceAddress, String targetAddress, String serializedPdu) {
+		assert(_transportProvider != null);
+		_transportProvider.sendMessage(sourceAddress, targetAddress, RpcProvider.RPC_MULTIPLEXIER, serializedPdu);
+	}
+	
+	protected synchronized long getNextCallTag() {
+		long tag = _nextCallTag++;
+		if(tag == 0)
+			tag++;
+		
+		return tag;
+	}
+	
+	private void handleCallRequestPdu(String sourceAddress, String targetAddress, RpcCallRequestPdu pdu) {
+		try {
+			RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu);
+			
+			// TODO, we are trying to avoid locking when calling into callbacks
+			// this can be optimized later
+			List<RpcServiceEndpoint> endpoints = new ArrayList<RpcServiceEndpoint>();
+			synchronized(_serviceEndpoints) {
+				endpoints.addAll(_serviceEndpoints);
+			}
+			
+			for(RpcServiceEndpoint endpoint : endpoints) {
+				if(endpoint.onCallReceive(call))
+					return;
+			}
+			
+			RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
+			responsePdu.setCommand(pdu.getCommand());
+			responsePdu.setRequestStartTick(pdu.getRequestStartTick());
+			responsePdu.setRequestTag(pdu.getRequestTag());
+			responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST);
+			sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
+			
+		} catch (Throwable e) {
+			
+			RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
+			responsePdu.setCommand(pdu.getCommand());
+			responsePdu.setRequestStartTick(pdu.getRequestStartTick());
+			responsePdu.setRequestTag(pdu.getRequestTag());
+			responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION);
+			
+			sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
+		}
+	}
+	
+	private void handleCallResponsePdu(String sourceAddress, String targetAddress, RpcCallResponsePdu pdu) {
+		RpcClientCallImpl call = null;
+		
+		synchronized(this) {
+			call = (RpcClientCallImpl)_outstandingCalls.remove(pdu.getRequestTag());
+		}
+		
+		if(call != null) {
+			switch(pdu.getResult()) {
+			case RpcCallResponsePdu.RESULT_SUCCESSFUL :
+				call.complete(pdu.getSerializedResult());
+				break;
+				
+			case RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST :
+				call.complete(new RpcException("Handler does not exist"));
+				break;
+				
+			case RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION :
+				call.complete(new RpcException("Exception in handler"));
+				break;
+				
+			default :
+				assert(false);
+				break;
+			}
+		}
+	}
+	
+	private class RpcTransportEndpoint implements TransportEndpoint {
+
+		@Override
+		public void onTransportMessage(String senderEndpointAddress,
+			String targetEndpointAddress, String multiplexer, String message) {
+			
+			// we won't handle generic transport message toward RPC transport endpoint
+		}
+
+		@Override
+		public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
+			if(bSuccess) 
+				_transportAddress = endpointAddress;
+				
+		}
+
+		@Override
+		public void onDetachIndication(String endpointAddress) {
+			if(_transportAddress != null && _transportAddress.equals(endpointAddress))
+				_transportAddress = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCall.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCall.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCall.java
new file mode 100644
index 0000000..a102503
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCall.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+public interface RpcServerCall {
+	String getCommand();
+	<T> T getCommandArgument();
+
+	// for receiver to response call
+	void completeCall(Object returnObject);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCallImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCallImpl.java
new file mode 100644
index 0000000..d1ac7a9
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCallImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public class RpcServerCallImpl implements RpcServerCall {
+	
+	private RpcProvider _rpcProvider;
+	private String _sourceAddress;
+	private String _targetAddress;
+	
+	private RpcCallRequestPdu _requestPdu;
+	
+	public RpcServerCallImpl(RpcProvider provider, String sourceAddress, String targetAddress, 
+		RpcCallRequestPdu requestPdu) {
+		
+		_rpcProvider = provider;
+		_sourceAddress = sourceAddress;
+		_targetAddress = targetAddress;
+		_requestPdu = requestPdu;
+	}
+
+	@Override
+	public String getCommand() {
+		assert(_requestPdu != null);
+		return _requestPdu.getCommand();
+	}
+
+	@Override
+	public <T> T getCommandArgument() {
+		if(_requestPdu.getSerializedCommandArg() == null)
+			return null;
+		
+		assert(_rpcProvider.getMessageSerializer() != null);
+		return _rpcProvider.getMessageSerializer().serializeFrom(_requestPdu.getSerializedCommandArg());
+	}
+
+	@Override
+	public void completeCall(Object returnObject) {
+		assert(_sourceAddress != null);
+		assert(_targetAddress != null);
+
+		RpcCallResponsePdu pdu = new RpcCallResponsePdu();
+		pdu.setCommand(_requestPdu.getCommand());
+		pdu.setRequestTag(_requestPdu.getRequestTag());
+		pdu.setRequestStartTick(_requestPdu.getRequestStartTick());
+		pdu.setRequestStartTick(RpcCallResponsePdu.RESULT_SUCCESSFUL);
+		if(returnObject != null) {
+			assert(_rpcProvider.getMessageSerializer() != null);
+			pdu.setSerializedResult(_rpcProvider.getMessageSerializer().serializeTo(returnObject.getClass(), returnObject));
+		}
+		
+		_rpcProvider.sendRpcPdu(_targetAddress, _sourceAddress, 
+			_rpcProvider.getMessageSerializer().serializeTo(RpcCallResponsePdu.class, pdu));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceDispatcher.java
new file mode 100644
index 0000000..c0d1566
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceDispatcher.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class RpcServiceDispatcher implements RpcServiceEndpoint {
+
+	private static Map<Class<?>, Map<String, Method>> s_handlerCache = new HashMap<Class<?>, Map<String, Method>>();
+
+	private static Map<Object, RpcServiceDispatcher> s_targetMap = new HashMap<Object, RpcServiceDispatcher>();
+	private Object _targetObject;
+	
+	public RpcServiceDispatcher(Object targetObject) {
+		_targetObject = targetObject;
+	}
+	
+	public static RpcServiceDispatcher getDispatcher(Object targetObject) {
+		RpcServiceDispatcher dispatcher;
+		synchronized(s_targetMap) {
+			dispatcher = s_targetMap.get(targetObject);
+			if(dispatcher == null) {
+				dispatcher = new RpcServiceDispatcher(targetObject);
+				s_targetMap.put(targetObject, dispatcher);
+			}
+		}
+		return dispatcher;
+	}
+	
+	public static void removeDispatcher(Object targetObject) {
+		synchronized(s_targetMap) {
+			s_targetMap.remove(targetObject);
+		}
+	}
+	
+	public static boolean dispatch(Object target, RpcServerCall serviceCall) {
+		assert(serviceCall != null);
+		assert(target != null);
+		
+		Method handler = resolveHandler(target.getClass(), serviceCall.getCommand());
+		if(handler == null)
+			return false;
+		
+		try {
+			handler.invoke(target, serviceCall);
+		} catch (IllegalArgumentException e) {
+			throw new RpcException("IllegalArgumentException when invoking RPC service command: " + serviceCall.getCommand());
+		} catch (IllegalAccessException e) {
+			throw new RpcException("IllegalAccessException when invoking RPC service command: " + serviceCall.getCommand());
+		} catch (InvocationTargetException e) {
+			throw new RpcException("InvocationTargetException when invoking RPC service command: " + serviceCall.getCommand());
+		}
+		
+		return true;
+	}
+	
+	public static Method resolveHandler(Class<?> handlerClz, String command) {
+		synchronized(s_handlerCache) {
+			Map<String, Method> handlerMap = getAndSetHandlerMap(handlerClz);
+				
+			Method handler = handlerMap.get(command);
+			if(handler != null)
+				return handler;
+			
+			for(Method method : handlerClz.getDeclaredMethods()) {
+				RpcServiceHandler annotation = method.getAnnotation(RpcServiceHandler.class);
+				if(annotation != null) {
+					if(annotation.command().equals(command)) {
+						method.setAccessible(true);
+						handlerMap.put(command, method);
+						return method;
+					}
+				}
+			}
+		}
+		
+		return null;
+	}
+	
+	private static Map<String, Method> getAndSetHandlerMap(Class<?> handlerClz) {
+		Map<String, Method> handlerMap;
+		synchronized(s_handlerCache) {
+			handlerMap = s_handlerCache.get(handlerClz);
+			
+			if(handlerMap == null) {
+				handlerMap = new HashMap<String, Method>();
+				s_handlerCache.put(handlerClz, handlerMap);
+			}
+		}
+		
+		return handlerMap;
+	}
+	
+	@Override
+	public boolean onCallReceive(RpcServerCall call) {
+		return dispatch(_targetObject, call);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceEndpoint.java
new file mode 100644
index 0000000..31dc083
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceEndpoint.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public interface RpcServiceEndpoint {
+	/*
+	 * @return
+	 * 		true call has been handled
+	 * 		false can not find the call handler
+	 * @throws
+	 *  	RpcException, exception when 
+	 */
+	boolean onCallReceive(RpcServerCall call);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceHandler.java
new file mode 100644
index 0000000..6a77f93
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface RpcServiceHandler {
+    String command();
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcTimeoutException.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcTimeoutException.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcTimeoutException.java
new file mode 100644
index 0000000..5c876c7
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcTimeoutException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public class RpcTimeoutException extends RpcException {
+
+	private static final long serialVersionUID = -3618654987984665833L;
+
+	public RpcTimeoutException() {
+		super();
+	}
+	
+	public RpcTimeoutException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/serializer/JsonMessageSerializer.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/serializer/JsonMessageSerializer.java b/framework/ipc/src/org/apache/cloudstack/framework/serializer/JsonMessageSerializer.java
new file mode 100644
index 0000000..2fcab54
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/serializer/JsonMessageSerializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.serializer;
+
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class JsonMessageSerializer implements MessageSerializer {
+
+	// this will be injected from external to allow installation of
+	// type adapters needed by upper layer applications
+	private Gson _gson;
+	
+	private OnwireClassRegistry _clzRegistry; 
+	
+	public JsonMessageSerializer() {
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        gsonBuilder.setVersion(1.5);
+        _gson = gsonBuilder.create();
+	}
+	
+	public Gson getGson() {
+		return _gson;
+	}
+	
+	public void setGson(Gson gson) {
+		_gson = gson;
+	}
+	
+	public OnwireClassRegistry getOnwireClassRegistry() {
+		return _clzRegistry;
+	}
+	
+	public void setOnwireClassRegistry(OnwireClassRegistry clzRegistry) {
+		_clzRegistry = clzRegistry;
+	}
+	
+	@Override
+	public <T> String serializeTo(Class<?> clz, T object) {
+		assert(clz != null);
+		assert(object != null);
+		
+		StringBuffer sbuf = new StringBuffer();
+		
+		OnwireName onwire = clz.getAnnotation(OnwireName.class);
+		if(onwire == null)
+			throw new RuntimeException("Class " + clz.getCanonicalName() + " is not declared to be onwire");
+		
+		sbuf.append(onwire.name()).append("|");
+		sbuf.append(_gson.toJson(object));
+		
+		return sbuf.toString();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <T> T serializeFrom(String message) {
+		assert(message != null);
+		int contentStartPos = message.indexOf('|');
+		if(contentStartPos < 0)
+			throw new RuntimeException("Invalid on-wire message format");
+		
+		String onwireName = message.substring(0, contentStartPos);
+		Class<?> clz = _clzRegistry.getOnwireClass(onwireName);
+		if(clz == null)
+			throw new RuntimeException("Onwire class is not registered. name: " + onwireName);
+		
+		return (T)_gson.fromJson(message.substring(contentStartPos + 1), clz);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/serializer/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/serializer/MessageSerializer.java b/framework/ipc/src/org/apache/cloudstack/framework/serializer/MessageSerializer.java
new file mode 100644
index 0000000..65d818e
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/serializer/MessageSerializer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.serializer;
+
+public interface MessageSerializer {
+	<T>String serializeTo(Class<?> clz, T object);
+	<T> T serializeFrom(String message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireClassRegistry.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireClassRegistry.java b/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireClassRegistry.java
new file mode 100644
index 0000000..ac9c6bc
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireClassRegistry.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.serializer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+
+//
+// Finding classes in a given package code is taken and modified from 
+// Credit: http://internna.blogspot.com/2007/11/java-5-retrieving-all-classes-from.html
+//
+public class OnwireClassRegistry {
+	
+	private List<String> packages = new ArrayList<String>();
+	private Map<String, Class<?>> registry =  new HashMap<String, Class<?>>();
+
+	public OnwireClassRegistry() {
+		registry.put("Object", Object.class);
+	}
+	
+	public OnwireClassRegistry(String packageName) {
+		addPackage(packageName);
+	}
+	
+	public OnwireClassRegistry(List<String> packages) {
+		packages.addAll(packages);
+	}
+	
+	public List<String> getPackages() {
+		return packages;
+	}
+	
+	public void setPackages(List<String> packages) {
+		this.packages = packages;
+	}
+	
+	public void addPackage(String packageName) {
+		packages.add(packageName);
+	}
+	
+	public void scan() {
+		Set<Class<?>> classes = new HashSet<Class<?>>();
+		for(String pkg : packages) {
+			classes.addAll(getClasses(pkg));
+		}
+		
+		for(Class<?> clz : classes) {
+			OnwireName onwire = clz.getAnnotation(OnwireName.class);
+			if(onwire != null) {
+				assert(onwire.name() != null);
+				
+				registry.put(onwire.name(), clz);
+			}
+		}
+	}
+	
+	public Class<?> getOnwireClass(String onwireName) {
+		return registry.get(onwireName);
+	}
+	
+	static Set<Class<?>> getClasses(String packageName) {
+	   ClassLoader loader = Thread.currentThread().getContextClassLoader();
+	   return getClasses(loader, packageName);
+	}
+
+	//
+	// Following helper methods can be put in a separated helper class,
+	// will do that later
+	//
+	static Set<Class<?>> getClasses(ClassLoader loader, String packageName) {
+	   Set<Class<?>> classes = new HashSet<Class<?>>();
+	   String path = packageName.replace('.', '/');
+	   try {
+		   Enumeration<URL> resources = loader.getResources(path);
+		   if (resources != null) {
+		      while (resources.hasMoreElements()) {
+		         String filePath = resources.nextElement().getFile();
+		         // WINDOWS HACK
+		         if(filePath.indexOf("%20") > 0)
+		            filePath = filePath.replaceAll("%20", " ");
+		         if (filePath != null) {
+		            if ((filePath.indexOf("!") > 0) && (filePath.indexOf(".jar") > 0)) {
+		               String jarPath = filePath.substring(0, filePath.indexOf("!"))
+		                  .substring(filePath.indexOf(":") + 1);
+		               // WINDOWS HACK
+		               if (jarPath.indexOf(":") >= 0) jarPath = jarPath.substring(1); 
+		               classes.addAll(getFromJARFile(jarPath, path));
+		            } else {
+		               classes.addAll(getFromDirectory(new File(filePath), packageName));
+		            }
+		         }
+		      }
+		   }
+	   } catch(IOException e) {
+	   } catch(ClassNotFoundException e) {
+	   }
+	   return classes;
+	}	
+	
+	static Set<Class<?>> getFromDirectory(File directory, String packageName) throws ClassNotFoundException {
+		   Set<Class<?>> classes = new HashSet<Class<?>>();
+		   if (directory.exists()) {
+		      for (String file : directory.list()) {
+		         if (file.endsWith(".class")) {
+		            String name = packageName + '.' + stripFilenameExtension(file);
+		            try {
+			            Class<?> clazz = Class.forName(name);
+			            classes.add(clazz);
+		            } catch(ClassNotFoundException e) {
+		            } catch(Exception e) {
+	            	}
+		         } else {
+		        	 File f = new File(directory.getPath() + "/" + file);
+		        	 if(f.isDirectory()) {
+		        		 classes.addAll(getFromDirectory(f, packageName + "." + file));
+		        	 }
+		         }
+		      }
+		   }
+		   return classes;
+		}
+	
+	static Set<Class<?>> getFromJARFile(String jar, String packageName) throws IOException, ClassNotFoundException {
+	   Set<Class<?>> classes = new HashSet<Class<?>>();
+	   JarInputStream jarFile = new JarInputStream(new FileInputStream(jar));
+	   JarEntry jarEntry;
+	   do {
+	      jarEntry = jarFile.getNextJarEntry();
+	      if (jarEntry != null) {
+	         String className = jarEntry.getName();
+	         if (className.endsWith(".class")) {
+	            className = stripFilenameExtension(className);
+	            if (className.startsWith(packageName)) {
+	            	try {
+	            		Class<?> clz = Class.forName(className.replace('/', '.'));
+	            		classes.add(clz);
+	            	} catch(ClassNotFoundException e) {
+	            	} catch(NoClassDefFoundError e) {
+	            	}
+	            }
+	         }
+	      }
+	   } while (jarEntry != null);
+	   
+	   return classes;
+	}
+	
+	static String stripFilenameExtension(String file) {
+		return file.substring(0, file.lastIndexOf('.'));
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireName.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireName.java b/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireName.java
new file mode 100644
index 0000000..ac195d0
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireName.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.serializer;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface OnwireName {
+	String name();
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
new file mode 100644
index 0000000..11bc428
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.server;
+
+import org.apache.cloudstack.framework.eventbus.EventBusBase;
+import org.apache.cloudstack.framework.transport.TransportMultiplexier;
+
+public class ServerEventBus extends EventBusBase implements TransportMultiplexier {
+
+	@Override
+	public void onTransportMessage(String senderEndpointAddress,
+			String targetEndpointAddress, String multiplexer, String message) {
+		// TODO Auto-generated method stub
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java
new file mode 100644
index 0000000..d5dae2e
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.server;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+import org.apache.cloudstack.framework.transport.TransportAddress;
+import org.apache.cloudstack.framework.transport.TransportDataPdu;
+import org.apache.cloudstack.framework.transport.TransportEndpoint;
+import org.apache.cloudstack.framework.transport.TransportEndpointSite;
+import org.apache.cloudstack.framework.transport.TransportPdu;
+import org.apache.cloudstack.framework.transport.TransportProvider;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.concurrency.NamedThreadFactory;
+
+public class ServerTransportProvider implements TransportProvider {
+    private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class);
+	
+	public static final int DEFAULT_WORKER_POOL_SIZE = 5;
+	
+	private String _nodeId;
+
+	private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>();
+	private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
+	private ExecutorService _executor;
+	
+	private int _nextEndpointId = new Random().nextInt();
+
+	private MessageSerializer _messageSerializer;
+	
+	public ServerTransportProvider() {
+	}
+	
+	public String getNodeId() { 
+		return _nodeId; 
+	}
+	
+	public ServerTransportProvider setNodeId(String nodeId) {
+		_nodeId = nodeId;
+		return this;
+	}
+	
+	public int getWorkerPoolSize() {
+		return _poolSize; 
+	}
+	
+	public ServerTransportProvider setWorkerPoolSize(int poolSize) {
+		assert(poolSize > 0);
+		
+		_poolSize = poolSize;
+		return this;
+	}
+	
+	@Override
+	public void setMessageSerializer(MessageSerializer messageSerializer) {
+		assert(messageSerializer != null);
+		_messageSerializer = messageSerializer;
+	}
+
+	@Override
+	public MessageSerializer getMessageSerializer() {
+		return _messageSerializer;
+	}
+	
+	public void initialize() {
+		_executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
+	}
+	
+	@Override
+	public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
+		
+		TransportAddress transportAddress;
+		String endpointId;
+		if(predefinedAddress != null && !predefinedAddress.isEmpty()) {
+			endpointId = predefinedAddress;
+			transportAddress = new TransportAddress(_nodeId, endpointId, 0);
+		} else {
+			endpointId = String.valueOf(getNextEndpointId());
+			transportAddress = new TransportAddress(_nodeId, endpointId);
+		}
+		
+		TransportEndpointSite endpointSite;
+		synchronized(this) {
+			endpointSite = _endpointMap.get(endpointId);
+			if(endpointSite != null) {
+				// already attached
+				return endpointSite;
+			}
+			endpointSite = new TransportEndpointSite(this, endpoint, transportAddress);
+			_endpointMap.put(endpointId, endpointSite);
+		}
+		
+		endpoint.onAttachConfirm(true, transportAddress.toString());
+		return endpointSite;
+	}
+
+	@Override
+	public boolean detach(TransportEndpoint endpoint) {
+		synchronized(this) {
+			for(Map.Entry<String, TransportEndpointSite> entry : _endpointMap.entrySet()) {
+				if(entry.getValue().getEndpoint() == endpoint) {
+					_endpointMap.remove(entry.getKey());
+					return true;
+				}
+			}
+		}
+			
+		return false;
+	}
+	
+	@Override
+	public void requestSiteOutput(final TransportEndpointSite site) {
+		_executor.execute(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					site.processOutput();
+					site.ackOutputProcessSignal();
+				} catch(Throwable e) {
+					s_logger.error("Unhandled exception", e);
+				}
+			}
+		});
+	}
+	
+	@Override
+	public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress, 
+		String multiplexier, String message) {
+		
+		TransportDataPdu pdu = new TransportDataPdu();
+		pdu.setSourceAddress(sourceEndpointAddress);
+		pdu.setDestAddress(targetEndpointAddress);
+		pdu.setMultiplexier(multiplexier);
+		pdu.setContent(message);
+		
+		dispatchPdu(pdu);
+	}
+	
+	private void dispatchPdu(TransportPdu pdu) {
+		
+		TransportAddress transportAddress = TransportAddress.fromAddressString(pdu.getDestAddress());
+		
+		if(isLocalAddress(transportAddress)) {
+			TransportEndpointSite endpointSite = null;
+			synchronized(this) {
+				endpointSite = _endpointMap.get(transportAddress.getEndpointId());
+			}
+			
+			if(endpointSite != null)
+				endpointSite.addOutputPdu(pdu);
+		} else {
+			// do cross-node forwarding
+			// ???
+		}
+	}
+	
+	private boolean isLocalAddress(TransportAddress address) {
+		if(address.getNodeId().equals(_nodeId) || address.getNodeId().equals(TransportAddress.LOCAL_SERVICE_NODE))
+			return true;
+		
+		return false;
+	}
+	
+	private synchronized int getNextEndpointId() {
+		return _nextEndpointId++;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java
new file mode 100644
index 0000000..4a5ad79
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.transport;
+
+import java.util.Random;
+
+public class TransportAddress {
+	public final static String LOCAL_SERVICE_NODE = "";
+	
+	private String _nodeId = LOCAL_SERVICE_NODE;
+	private String _endpointId;
+	private int _magic;
+	
+	public TransportAddress(String nodeId, String endpointId) {
+		assert(nodeId != null);
+		assert(endpointId != null);
+		assert(nodeId.indexOf(".") < 0);
+		assert(endpointId.indexOf(".") < 0);
+		
+		_nodeId = nodeId;
+		_endpointId = endpointId;
+		_magic = new Random().nextInt();
+	}
+	
+	public TransportAddress(String nodeId, String endpointId, int magic) {
+		assert(nodeId != null);
+		assert(endpointId != null);
+		assert(nodeId.indexOf(".") < 0);
+		assert(endpointId.indexOf(".") < 0);
+		
+		_nodeId = nodeId;
+		_endpointId = endpointId;
+		_magic = magic;
+	}
+	
+	public String getNodeId() { 
+		return _nodeId; 
+	}
+	
+	public TransportAddress setNodeId(String nodeId) {
+		_nodeId = nodeId;
+		return this;
+	}
+	
+	public String getEndpointId() {
+		return _endpointId;
+	}
+	
+	public TransportAddress setEndpointId(String endpointId) {
+		_endpointId = endpointId;
+		return this;
+	}
+	
+	public static TransportAddress fromAddressString(String addressString) {
+		if(addressString == null || addressString.isEmpty())
+			return null;
+		
+		String tokens[] = addressString.split("\\.");
+		if(tokens.length != 3)
+			return null;
+			
+		return new TransportAddress(tokens[0], tokens[1], Integer.parseInt(tokens[2]));
+	}
+	
+	public static TransportAddress getLocalPredefinedTransportAddress(String predefinedIdentifier) {
+		return new TransportAddress(LOCAL_SERVICE_NODE, predefinedIdentifier, 0);
+	}
+
+	@Override
+	public int hashCode() {
+		int hashCode = _magic;
+		hashCode = (hashCode << 3) ^ _nodeId.hashCode();
+		hashCode = (hashCode << 3) ^ _endpointId.hashCode();
+		
+		return hashCode;
+	}
+	
+	@Override
+	public boolean equals(Object other) {
+		if(other == null)
+			return false;
+		
+		if(!(other instanceof TransportAddress))
+			return false;
+		
+		if(this == other)
+			return true;
+		
+		return _nodeId.equals(((TransportAddress)other)._nodeId) && 
+			_endpointId.equals(((TransportAddress)other)._endpointId) &&
+			_magic == ((TransportAddress)other)._magic;
+	}
+	
+	@Override
+	public String toString() {
+		StringBuffer sb = new StringBuffer();
+		if(_nodeId != null)
+			sb.append(_nodeId);
+		sb.append(".");
+		sb.append(_endpointId);
+		sb.append(".");
+		sb.append(_magic);
+		
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddressMapper.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddressMapper.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddressMapper.java
new file mode 100644
index 0000000..6edb788
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddressMapper.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.transport;
+
+public interface TransportAddressMapper {
+	String getAddress();
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportDataPdu.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportDataPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportDataPdu.java
new file mode 100644
index 0000000..ac9e06d
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportDataPdu.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.transport;
+
+import org.apache.cloudstack.framework.serializer.OnwireName;
+
+@OnwireName(name="TransportDataPdu")
+public class TransportDataPdu extends TransportPdu {
+
+	private String _multiplexier;
+	private String _content;
+	
+	public TransportDataPdu() {
+	}
+	
+	public String getMultiplexier() {
+		return _multiplexier;
+	}
+	
+	public void setMultiplexier(String multiplexier) {
+		_multiplexier = multiplexier;
+	}
+	
+	public String getContent() {
+		return _content;
+	}
+	
+	public void setContent(String content) {
+		_content = content;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpoint.java
new file mode 100644
index 0000000..7767c35
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpoint.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.transport;
+
+public interface TransportEndpoint extends TransportMultiplexier {
+	void onAttachConfirm(boolean bSuccess, String endpointAddress);
+	void onDetachIndication(String endpointAddress);
+}