You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2012/12/12 02:54:28 UTC

git commit: Hook server side RPC provider with a server side transport

Updated Branches:
  refs/heads/javelin 98a1295fb -> f52950689


Hook server side RPC provider with a server side transport


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/f5295068
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/f5295068
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/f5295068

Branch: refs/heads/javelin
Commit: f52950689b53d2be9d457a1fc494463d48df4552
Parents: 98a1295
Author: Kelven Yang <ke...@gmail.com>
Authored: Tue Dec 11 17:53:46 2012 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Tue Dec 11 17:53:46 2012 -0800

----------------------------------------------------------------------
 .../framework/messaging/RpcProvider.java           |    4 +-
 .../framework/messaging/RpcProviderImpl.java       |   79 +++++++++++----
 .../framework/messaging/TransportEndpoint.java     |   10 +--
 .../framework/messaging/TransportEndpointSite.java |   38 +++++++-
 .../framework/messaging/TransportProvider.java     |    2 +-
 .../messaging/client/ClientTransportEndpoint.java  |   23 +----
 .../messaging/client/ClientTransportProvider.java  |    5 +-
 .../messaging/server/ServerTransportProvider.java  |   26 ++---
 8 files changed, 112 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
index 3b766e8..b7c3fd6 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
@@ -23,6 +23,7 @@ public interface RpcProvider extends TransportMultiplexier {
 	
 	void setMessageSerializer(MessageSerializer messageSerializer);
 	MessageSerializer getMessageSerializer();
+	boolean initialize();
 	
 	void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
 	void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
@@ -33,9 +34,6 @@ public interface RpcProvider extends TransportMultiplexier {
 	//
 	// low-level public API
 	//
-	RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress);
-	RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress);
-
 	void registerCall(RpcClientCall call);
 	void cancelCall(RpcClientCall call);
 	

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
index d4355fa..409be59 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
@@ -24,9 +24,13 @@ import java.util.List;
 import java.util.Map;
 
 public class RpcProviderImpl implements RpcProvider {
+	public static final String RPC_MULTIPLEXIER = "rpc";
 	
 	private TransportProvider _transportProvider;
-	private MessageSerializer _messageSerializer;
+	private String _transportAddress;
+	private RpcTransportEndpoint _transportEndpoint = new RpcTransportEndpoint();	// transport attachment at RPC layer
+	
+	private MessageSerializer _messageSerializer = new JsonMessageSerializer();		// default message serializer
 	private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>();
 	private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>();
 	
@@ -60,6 +64,7 @@ public class RpcProviderImpl implements RpcProvider {
 
 	@Override
 	public void setMessageSerializer(MessageSerializer messageSerializer) {
+		assert(messageSerializer != null);
 		_messageSerializer = messageSerializer;
 	}
 
@@ -67,6 +72,17 @@ public class RpcProviderImpl implements RpcProvider {
 	public MessageSerializer getMessageSerializer() {
 		return _messageSerializer;
 	}
+	
+	@Override
+	public boolean initialize() {
+		assert(_transportProvider != null);
+		if(_transportProvider == null)
+			return false;
+		
+		TransportEndpointSite endpointSite = _transportProvider.attach(_transportEndpoint, "RpcProvider");
+		endpointSite.registerMultiplexier(RPC_MULTIPLEXIER, this);
+		return true;
+	}
 
 	@Override
 	public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
@@ -83,40 +99,38 @@ public class RpcProviderImpl implements RpcProvider {
 	}
 	
 	@Override
-	public RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress) {
+	public RpcClientCall newCall(String targetAddress) {
+
 		long callTag = getNextCallTag();
 		RpcClientCallImpl call = new RpcClientCallImpl(this);
-		call.setSourceAddress(sourceEndpoint.getEndpointAddress());
+		call.setSourceAddress(_transportAddress);
 		call.setTargetAddress(targetAddress);
 		call.setCallTag(callTag);
 		
-		return call;
-	}
-
-	@Override
-	public RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress) {
-		long callTag = getNextCallTag();
-		RpcClientCallImpl call = new RpcClientCallImpl(this);
-		call.setSourceAddress(sourceEndpoint.getEndpointAddress());
-		call.setTargetAddress(targetAddress.getAddress());
-		call.setCallTag(callTag);
+		RpcCallRequestPdu pdu = new RpcCallRequestPdu();
+		pdu.setCommand(call.getCommand());
+		pdu.setRequestTag(callTag);
+		pdu.setRequestStartTick(System.currentTimeMillis());
+		
+		String serializedCmdArg;
+		if(call.getCommandArg() != null)
+			serializedCmdArg = _messageSerializer.serializeTo(call.getCommandArg().getClass(), call.getCommandArg());
+		else
+			serializedCmdArg = _messageSerializer.serializeTo(Object.class, null);
+		pdu.setSerializedCommandArg(serializedCmdArg);
+		
+		String serializedPdu = _messageSerializer.serializeTo(RpcCallRequestPdu.class, pdu);
+		_transportProvider.sendMessage(_transportAddress, targetAddress, RPC_MULTIPLEXIER, 
+			serializedPdu);
 		
 		return call;
 	}
 	
 	@Override
-	public RpcClientCall newCall(String targetAddress) {
-
-		// ???
-		return null;
-	}
-	
-	@Override
 	public RpcClientCall newCall(TransportAddressMapper targetAddress) {
 		return newCall(targetAddress.getAddress());
 	}
 	
-	
 	@Override
 	public void registerCall(RpcClientCall call) {
 		assert(call != null);
@@ -210,4 +224,27 @@ public class RpcProviderImpl implements RpcProvider {
 			}
 		}
 	}
+	
+	private class RpcTransportEndpoint implements TransportEndpoint {
+
+		@Override
+		public void onTransportMessage(String senderEndpointAddress,
+			String targetEndpointAddress, String multiplexer, String message) {
+			
+			// we won't handle generic transport message toward RPC transport endpoint
+		}
+
+		@Override
+		public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
+			if(bSuccess) 
+				_transportAddress = endpointAddress;
+				
+		}
+
+		@Override
+		public void onDetachIndication(String endpointAddress) {
+			if(_transportAddress != null && _transportAddress.equals(endpointAddress))
+				_transportAddress = null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
index 0996bfd..fedfb35 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
@@ -18,15 +18,7 @@
  */
 package org.apache.cloudstack.framework.messaging;
 
-public interface TransportEndpoint {
-	String getEndpointAddress();
-	
+public interface TransportEndpoint extends TransportMultiplexier {
 	void onAttachConfirm(boolean bSuccess, String endpointAddress);
 	void onDetachIndication(String endpointAddress);
-	
-	void registerMultiplexier(String name, TransportMultiplexier multiplexier);
-	void unregisterMultiplexier(String name);
-	
-	void sendMessage(String soureEndpointAddress, String targetEndpointAddress, 
-		String multiplexier, String message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
index 4af7772..ca6155b 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
@@ -19,13 +19,16 @@
 package org.apache.cloudstack.framework.messaging;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class TransportEndpointSite {
 	private TransportEndpoint _endpoint;
 	private TransportAddress _address;
 	
 	private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
+	private Map<String, TransportMultiplexier> _multiplexierMap = new HashMap<String, TransportMultiplexier>();  
 	
 	public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) {
 		assert(endpoint != null);
@@ -47,6 +50,19 @@ public class TransportEndpointSite {
 		_address = address;
 	}
 	
+	public void registerMultiplexier(String name, TransportMultiplexier multiplexier) {
+		assert(name != null);
+		assert(multiplexier != null);
+		assert(_multiplexierMap.get(name) == null);
+		
+		_multiplexierMap.put(name, multiplexier);
+	}
+	
+	public void unregisterMultiplexier(String name) {
+		assert(name != null);
+		_multiplexierMap.remove(name);
+	}
+	
 	public void addOutputPdu(TransportPdu pdu) {
 		synchronized(this) {
 			_outputQueue.add(pdu);
@@ -66,8 +82,26 @@ public class TransportEndpointSite {
 	
 	private void processOutput() {
 		TransportPdu pdu;
-		while((pdu = getNextOutputPdu()) != null) {
-			// ???
+		TransportEndpoint endpoint = getEndpoint();
+
+		if(endpoint != null) {
+			while((pdu = getNextOutputPdu()) != null) {
+				if(pdu instanceof TransportDataPdu) {
+					String multiplexierName = ((TransportDataPdu) pdu).getMultiplexier();
+					TransportMultiplexier multiplexier = getRoutedMultiplexier(multiplexierName);
+					assert(multiplexier != null);
+					multiplexier.onTransportMessage(pdu.getSourceAddress(), pdu.getDestAddress(), 
+						multiplexierName, ((TransportDataPdu) pdu).getContent());
+				}
+			}
 		}
 	}
+	
+	private TransportMultiplexier getRoutedMultiplexier(String multiplexierName) {
+		TransportMultiplexier multiplexier = _multiplexierMap.get(multiplexierName);
+		if(multiplexier == null)
+			multiplexier = _endpoint;
+		
+		return multiplexier;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
index 132aa9a..bdbdd17 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
@@ -19,7 +19,7 @@
 package org.apache.cloudstack.framework.messaging;
 
 public interface TransportProvider {
-	boolean attach(TransportEndpoint endpoint, String predefinedAddress);
+	TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress);
 	boolean detach(TransportEndpoint endpoint);
 	
 	void sendMessage(String soureEndpointAddress, String targetEndpointAddress, 

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
index a1c345e..44c8060 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
@@ -19,17 +19,10 @@
 package org.apache.cloudstack.framework.messaging.client;
 
 import org.apache.cloudstack.framework.messaging.TransportEndpoint;
-import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
 
 public class ClientTransportEndpoint implements TransportEndpoint {
 
 	@Override
-	public String getEndpointAddress() {
-		// ???
-		return "";
-	}
-	
-	@Override
 	public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
 		// TODO Auto-generated method stub
 	}
@@ -39,19 +32,9 @@ public class ClientTransportEndpoint implements TransportEndpoint {
 	}
 
 	@Override
-	public void registerMultiplexier(String name,
-			TransportMultiplexier multiplexier) {
-		// TODO Auto-generated method stub
-	}
-
-	@Override
-	public void unregisterMultiplexier(String name) {
-		// TODO Auto-generated method stub
-	}
-
-	@Override
-	public void sendMessage(String sourceEndpointAddress,
-			String targetEndpointAddress, String multiplexier, String message) {
+	public void onTransportMessage(String senderEndpointAddress,
+			String targetEndpointAddress, String multiplexer, String message) {
 		// TODO Auto-generated method stub
+		
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
index 2189c7a..60c07c3 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
@@ -19,14 +19,15 @@
 package org.apache.cloudstack.framework.messaging.client;
 
 import org.apache.cloudstack.framework.messaging.TransportEndpoint;
+import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
 import org.apache.cloudstack.framework.messaging.TransportProvider;
 
 public class ClientTransportProvider implements TransportProvider {
 
 	@Override
-	public boolean attach(TransportEndpoint endpoint, String predefinedAddress) {
+	public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
 		// TODO Auto-generated method stub
-		return false;
+		return null;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
index 02674cb..3372b75 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
@@ -45,7 +45,7 @@ public class ServerTransportProvider implements TransportProvider {
 	}
 	
 	@Override
-	public boolean attach(TransportEndpoint endpoint, String predefinedAddress) {
+	public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
 		
 		TransportAddress transportAddress;
 		String endpointId;
@@ -62,35 +62,26 @@ public class ServerTransportProvider implements TransportProvider {
 			endpointSite = _endpointMap.get(endpointId);
 			if(endpointSite != null) {
 				// already attached
-				return false;
+				return endpointSite;
 			}
 			endpointSite = new TransportEndpointSite(endpoint, transportAddress);
 			_endpointMap.put(endpointId, endpointSite);
 		}
 		
 		endpoint.onAttachConfirm(true, transportAddress.toString());
-		return true;
+		return endpointSite;
 	}
 
 	@Override
 	public boolean detach(TransportEndpoint endpoint) {
-		TransportAddress transportAddress = TransportAddress.fromAddressString(endpoint.getEndpointAddress());
-		if(transportAddress == null)
-			return false;
-		
-		boolean found = false;
 		synchronized(this) {
-			TransportEndpointSite endpointSite = _endpointMap.get(transportAddress.getEndpointId());
-			if(endpointSite.getAddress().equals(transportAddress)) {
-				found = true;
-				_endpointMap.remove(transportAddress.getEndpointId());
+			for(Map.Entry<String, TransportEndpointSite> entry : _endpointMap.entrySet()) {
+				if(entry.getValue().getEndpoint() == endpoint) {
+					_endpointMap.remove(entry.getKey());
+					return true;
+				}
 			}
 		}
-		
-		if(found) {
-			endpoint.onDetachIndication(endpoint.getEndpointAddress());
-			return true;
-		}
 			
 		return false;
 	}
@@ -122,6 +113,7 @@ public class ServerTransportProvider implements TransportProvider {
 				endpointSite.addOutputPdu(pdu);
 		} else {
 			// do cross-node forwarding
+			// ???
 		}
 	}