You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2012/11/29 03:03:49 UTC

git commit: Finish RPC service server side implementation

Updated Branches:
  refs/heads/javelin 1d7506321 -> fc16e1ea1


Finish RPC service server side implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/fc16e1ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/fc16e1ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/fc16e1ea

Branch: refs/heads/javelin
Commit: fc16e1ea1aebb5f9adebcc890971f3c83efb155b
Parents: 1d75063
Author: Kelven Yang <ke...@gmail.com>
Authored: Wed Nov 28 18:03:20 2012 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Wed Nov 28 18:03:20 2012 -0800

----------------------------------------------------------------------
 .../framework/messaging/RpcClientCall.java         |    2 +
 .../framework/messaging/RpcClientCallImpl.java     |   34 ++++++--
 .../framework/messaging/RpcProviderImpl.java       |   43 ++++++++--
 .../framework/messaging/RpcServerCallImpl.java     |   71 +++++++++++++++
 4 files changed, 136 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/fc16e1ea/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
index d2adbfd..1ad03ef 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
@@ -32,6 +32,8 @@ public interface RpcClientCall {
 	Object getContextParam(String key);
 	
 	<T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener);
+	RpcClientCall setCallbackDispatcherTarget(Object target);
+	
 	RpcClientCall setOneway();
 	
 	void apply();

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/fc16e1ea/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
index 574a273..e244f62 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
@@ -33,7 +33,8 @@ public class RpcClientCallImpl implements RpcClientCall {
 	private boolean _oneway = false;
 	
 	private List<RpcCallbackListener> _callbackListeners = new ArrayList<RpcCallbackListener>();
-
+	private Object _callbackDispatcherTarget;
+	
 	private RpcProvider _rpcProvider;
 	private long _startTickInMs;
 	private long _callTag;
@@ -95,6 +96,13 @@ public class RpcClientCallImpl implements RpcClientCall {
 		_callbackListeners.add(listener);
 		return this;
 	}
+	
+	@Override
+	public RpcClientCall setCallbackDispatcherTarget(Object target) {
+		_callbackDispatcherTarget = target;
+		return this;
+	}
+	
 
 	@Override
 	public RpcClientCall setOneway() {
@@ -189,11 +197,16 @@ public class RpcClientCallImpl implements RpcClientCall {
 			_responseDone = true;
 			_responseLock.notifyAll();
 		}
-		
-		assert(_rpcProvider.getMessageSerializer() != null);
-		Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
-		for(RpcCallbackListener listener: _callbackListeners)
-			listener.onSuccess(resultObject);
+	
+		if(_callbackListeners.size() > 0) {
+			assert(_rpcProvider.getMessageSerializer() != null);
+			Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
+			for(RpcCallbackListener listener: _callbackListeners)
+				listener.onSuccess(resultObject);
+		} else {
+			if(_callbackDispatcherTarget != null)
+				RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
+		}
 	}
 	
 	public void complete(RpcException e) {
@@ -205,7 +218,12 @@ public class RpcClientCallImpl implements RpcClientCall {
 			_responseLock.notifyAll();
 		}
 		
-		for(RpcCallbackListener listener: _callbackListeners)
-			listener.onFailure(e);
+		if(_callbackListeners.size() > 0) {
+			for(RpcCallbackListener listener: _callbackListeners)
+				listener.onFailure(e);
+		} else {
+			if(_callbackDispatcherTarget != null)
+				RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/fc16e1ea/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
index c652982..ea69931 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
@@ -50,9 +50,9 @@ public class RpcProviderImpl implements RpcProvider {
 		
 		Object pdu = _messageSerializer.serializeFrom(message);
 		if(pdu instanceof RpcCallRequestPdu) {
-			handleCallRequestPdu((RpcCallRequestPdu)pdu);
+			handleCallRequestPdu(senderEndpointAddress, targetEndpointAddress, (RpcCallRequestPdu)pdu);
 		} else if(pdu instanceof RpcCallResponsePdu) {
-			handleCallResponsePdu((RpcCallResponsePdu)pdu);
+			handleCallResponsePdu(senderEndpointAddress, targetEndpointAddress, (RpcCallResponsePdu)pdu);
 		} else {
 			assert(false);
 		}
@@ -81,7 +81,7 @@ public class RpcProviderImpl implements RpcProvider {
 			_serviceEndpoints.remove(rpcEndpoint);
 		}
 	}
-
+	
 	@Override
 	public RpcClientCall newCall(String sourceAddress, String targetAddress) {
 		long callTag = getNextCallTag();
@@ -124,11 +124,42 @@ public class RpcProviderImpl implements RpcProvider {
 		return tag;
 	}
 	
-	private void handleCallRequestPdu(RpcCallRequestPdu pdu) {
-		// ???
+	private void handleCallRequestPdu(String sourceAddress, String targetAddress, RpcCallRequestPdu pdu) {
+		try {
+			RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu);
+			
+			// TODO, we are trying to avoid locking when calling into callbacks
+			// this can be optimized later
+			List<RpcServiceEndpoint> endpoints = new ArrayList<RpcServiceEndpoint>();
+			synchronized(_serviceEndpoints) {
+				endpoints.addAll(_serviceEndpoints);
+			}
+			
+			for(RpcServiceEndpoint endpoint : endpoints) {
+				if(RpcServiceDispatcher.dispatch(endpoint, call))
+					return;
+			}
+			
+			RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
+			responsePdu.setCommand(pdu.getCommand());
+			responsePdu.setRequestStartTick(pdu.getRequestStartTick());
+			responsePdu.setRequestTag(pdu.getRequestTag());
+			responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST);
+			sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
+			
+		} catch (Throwable e) {
+			
+			RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
+			responsePdu.setCommand(pdu.getCommand());
+			responsePdu.setRequestStartTick(pdu.getRequestStartTick());
+			responsePdu.setRequestTag(pdu.getRequestTag());
+			responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION);
+			
+			sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
+		}
 	}
 	
-	private void handleCallResponsePdu(RpcCallResponsePdu pdu) {
+	private void handleCallResponsePdu(String sourceAddress, String targetAddress, RpcCallResponsePdu pdu) {
 		RpcClientCallImpl call = null;
 		
 		synchronized(this) {

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/fc16e1ea/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
new file mode 100644
index 0000000..75f521f
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+public class RpcServerCallImpl implements RpcServerCall {
+	
+	private RpcProvider _rpcProvider;
+	private String _sourceAddress;
+	private String _targetAddress;
+	
+	private RpcCallRequestPdu _requestPdu;
+	
+	public RpcServerCallImpl(RpcProvider provider, String sourceAddress, String targetAddress, 
+		RpcCallRequestPdu requestPdu) {
+		
+		_rpcProvider = provider;
+		_sourceAddress = sourceAddress;
+		_targetAddress = targetAddress;
+		_requestPdu = requestPdu;
+	}
+
+	@Override
+	public String getCommand() {
+		assert(_requestPdu != null);
+		return _requestPdu.getCommand();
+	}
+
+	@Override
+	public Object getCommandArgument() {
+		if(_requestPdu.getSerializedCommandArg() == null)
+			return null;
+		
+		assert(_rpcProvider.getMessageSerializer() != null);
+		return _rpcProvider.getMessageSerializer().serializeFrom(_requestPdu.getSerializedCommandArg());
+	}
+
+	@Override
+	public void completeCall(Object returnObject) {
+		assert(_sourceAddress != null);
+		assert(_targetAddress != null);
+
+		RpcCallResponsePdu pdu = new RpcCallResponsePdu();
+		pdu.setCommand(_requestPdu.getCommand());
+		pdu.setRequestTag(_requestPdu.getRequestTag());
+		pdu.setRequestStartTick(_requestPdu.getRequestStartTick());
+		pdu.setRequestStartTick(RpcCallResponsePdu.RESULT_SUCCESSFUL);
+		if(returnObject != null) {
+			assert(_rpcProvider.getMessageSerializer() != null);
+			pdu.setSerializedResult(_rpcProvider.getMessageSerializer().serializeTo(returnObject.getClass(), returnObject));
+		}
+		
+		_rpcProvider.sendRpcPdu(_targetAddress, _sourceAddress, 
+			_rpcProvider.getMessageSerializer().serializeTo(RpcCallResponsePdu.class, pdu));
+	}
+}