You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2012/12/12 02:54:28 UTC
git commit: Hook server side RPC provider with a server side transport
Updated Branches:
refs/heads/javelin 98a1295fb -> f52950689
Hook server side RPC provider with a server side transport
Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/f5295068
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/f5295068
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/f5295068
Branch: refs/heads/javelin
Commit: f52950689b53d2be9d457a1fc494463d48df4552
Parents: 98a1295
Author: Kelven Yang <ke...@gmail.com>
Authored: Tue Dec 11 17:53:46 2012 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Tue Dec 11 17:53:46 2012 -0800
----------------------------------------------------------------------
.../framework/messaging/RpcProvider.java | 4 +-
.../framework/messaging/RpcProviderImpl.java | 79 +++++++++++----
.../framework/messaging/TransportEndpoint.java | 10 +--
.../framework/messaging/TransportEndpointSite.java | 38 +++++++-
.../framework/messaging/TransportProvider.java | 2 +-
.../messaging/client/ClientTransportEndpoint.java | 23 +----
.../messaging/client/ClientTransportProvider.java | 5 +-
.../messaging/server/ServerTransportProvider.java | 26 ++---
8 files changed, 112 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
index 3b766e8..b7c3fd6 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java
@@ -23,6 +23,7 @@ public interface RpcProvider extends TransportMultiplexier {
void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer();
+ boolean initialize();
void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
@@ -33,9 +34,6 @@ public interface RpcProvider extends TransportMultiplexier {
//
// low-level public API
//
- RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress);
- RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress);
-
void registerCall(RpcClientCall call);
void cancelCall(RpcClientCall call);
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
index d4355fa..409be59 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
@@ -24,9 +24,13 @@ import java.util.List;
import java.util.Map;
public class RpcProviderImpl implements RpcProvider {
+ public static final String RPC_MULTIPLEXIER = "rpc";
private TransportProvider _transportProvider;
- private MessageSerializer _messageSerializer;
+ private String _transportAddress;
+ private RpcTransportEndpoint _transportEndpoint = new RpcTransportEndpoint(); // transport attachment at RPC layer
+
+ private MessageSerializer _messageSerializer = new JsonMessageSerializer(); // default message serializer
private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>();
private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>();
@@ -60,6 +64,7 @@ public class RpcProviderImpl implements RpcProvider {
@Override
public void setMessageSerializer(MessageSerializer messageSerializer) {
+ assert(messageSerializer != null);
_messageSerializer = messageSerializer;
}
@@ -67,6 +72,17 @@ public class RpcProviderImpl implements RpcProvider {
public MessageSerializer getMessageSerializer() {
return _messageSerializer;
}
+
+ @Override
+ public boolean initialize() {
+ assert(_transportProvider != null);
+ if(_transportProvider == null)
+ return false;
+
+ TransportEndpointSite endpointSite = _transportProvider.attach(_transportEndpoint, "RpcProvider");
+ endpointSite.registerMultiplexier(RPC_MULTIPLEXIER, this);
+ return true;
+ }
@Override
public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
@@ -83,40 +99,38 @@ public class RpcProviderImpl implements RpcProvider {
}
@Override
- public RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress) {
+ public RpcClientCall newCall(String targetAddress) {
+
long callTag = getNextCallTag();
RpcClientCallImpl call = new RpcClientCallImpl(this);
- call.setSourceAddress(sourceEndpoint.getEndpointAddress());
+ call.setSourceAddress(_transportAddress);
call.setTargetAddress(targetAddress);
call.setCallTag(callTag);
- return call;
- }
-
- @Override
- public RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress) {
- long callTag = getNextCallTag();
- RpcClientCallImpl call = new RpcClientCallImpl(this);
- call.setSourceAddress(sourceEndpoint.getEndpointAddress());
- call.setTargetAddress(targetAddress.getAddress());
- call.setCallTag(callTag);
+ RpcCallRequestPdu pdu = new RpcCallRequestPdu();
+ pdu.setCommand(call.getCommand());
+ pdu.setRequestTag(callTag);
+ pdu.setRequestStartTick(System.currentTimeMillis());
+
+ String serializedCmdArg;
+ if(call.getCommandArg() != null)
+ serializedCmdArg = _messageSerializer.serializeTo(call.getCommandArg().getClass(), call.getCommandArg());
+ else
+ serializedCmdArg = _messageSerializer.serializeTo(Object.class, null);
+ pdu.setSerializedCommandArg(serializedCmdArg);
+
+ String serializedPdu = _messageSerializer.serializeTo(RpcCallRequestPdu.class, pdu);
+ _transportProvider.sendMessage(_transportAddress, targetAddress, RPC_MULTIPLEXIER,
+ serializedPdu);
return call;
}
@Override
- public RpcClientCall newCall(String targetAddress) {
-
- // ???
- return null;
- }
-
- @Override
public RpcClientCall newCall(TransportAddressMapper targetAddress) {
return newCall(targetAddress.getAddress());
}
-
@Override
public void registerCall(RpcClientCall call) {
assert(call != null);
@@ -210,4 +224,27 @@ public class RpcProviderImpl implements RpcProvider {
}
}
}
+
+ private class RpcTransportEndpoint implements TransportEndpoint {
+
+ @Override
+ public void onTransportMessage(String senderEndpointAddress,
+ String targetEndpointAddress, String multiplexer, String message) {
+
+ // we won't handle generic transport message toward RPC transport endpoint
+ }
+
+ @Override
+ public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
+ if(bSuccess)
+ _transportAddress = endpointAddress;
+
+ }
+
+ @Override
+ public void onDetachIndication(String endpointAddress) {
+ if(_transportAddress != null && _transportAddress.equals(endpointAddress))
+ _transportAddress = null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
index 0996bfd..fedfb35 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java
@@ -18,15 +18,7 @@
*/
package org.apache.cloudstack.framework.messaging;
-public interface TransportEndpoint {
- String getEndpointAddress();
-
+public interface TransportEndpoint extends TransportMultiplexier {
void onAttachConfirm(boolean bSuccess, String endpointAddress);
void onDetachIndication(String endpointAddress);
-
- void registerMultiplexier(String name, TransportMultiplexier multiplexier);
- void unregisterMultiplexier(String name);
-
- void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
- String multiplexier, String message);
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
index 4af7772..ca6155b 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java
@@ -19,13 +19,16 @@
package org.apache.cloudstack.framework.messaging;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class TransportEndpointSite {
private TransportEndpoint _endpoint;
private TransportAddress _address;
private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
+ private Map<String, TransportMultiplexier> _multiplexierMap = new HashMap<String, TransportMultiplexier>();
public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) {
assert(endpoint != null);
@@ -47,6 +50,19 @@ public class TransportEndpointSite {
_address = address;
}
+ public void registerMultiplexier(String name, TransportMultiplexier multiplexier) {
+ assert(name != null);
+ assert(multiplexier != null);
+ assert(_multiplexierMap.get(name) == null);
+
+ _multiplexierMap.put(name, multiplexier);
+ }
+
+ public void unregisterMultiplexier(String name) {
+ assert(name != null);
+ _multiplexierMap.remove(name);
+ }
+
public void addOutputPdu(TransportPdu pdu) {
synchronized(this) {
_outputQueue.add(pdu);
@@ -66,8 +82,26 @@ public class TransportEndpointSite {
private void processOutput() {
TransportPdu pdu;
- while((pdu = getNextOutputPdu()) != null) {
- // ???
+ TransportEndpoint endpoint = getEndpoint();
+
+ if(endpoint != null) {
+ while((pdu = getNextOutputPdu()) != null) {
+ if(pdu instanceof TransportDataPdu) {
+ String multiplexierName = ((TransportDataPdu) pdu).getMultiplexier();
+ TransportMultiplexier multiplexier = getRoutedMultiplexier(multiplexierName);
+ assert(multiplexier != null);
+ multiplexier.onTransportMessage(pdu.getSourceAddress(), pdu.getDestAddress(),
+ multiplexierName, ((TransportDataPdu) pdu).getContent());
+ }
+ }
}
}
+
+ private TransportMultiplexier getRoutedMultiplexier(String multiplexierName) {
+ TransportMultiplexier multiplexier = _multiplexierMap.get(multiplexierName);
+ if(multiplexier == null)
+ multiplexier = _endpoint;
+
+ return multiplexier;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
index 132aa9a..bdbdd17 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java
@@ -19,7 +19,7 @@
package org.apache.cloudstack.framework.messaging;
public interface TransportProvider {
- boolean attach(TransportEndpoint endpoint, String predefinedAddress);
+ TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress);
boolean detach(TransportEndpoint endpoint);
void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
index a1c345e..44c8060 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java
@@ -19,17 +19,10 @@
package org.apache.cloudstack.framework.messaging.client;
import org.apache.cloudstack.framework.messaging.TransportEndpoint;
-import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
public class ClientTransportEndpoint implements TransportEndpoint {
@Override
- public String getEndpointAddress() {
- // ???
- return "";
- }
-
- @Override
public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
// TODO Auto-generated method stub
}
@@ -39,19 +32,9 @@ public class ClientTransportEndpoint implements TransportEndpoint {
}
@Override
- public void registerMultiplexier(String name,
- TransportMultiplexier multiplexier) {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void unregisterMultiplexier(String name) {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void sendMessage(String sourceEndpointAddress,
- String targetEndpointAddress, String multiplexier, String message) {
+ public void onTransportMessage(String senderEndpointAddress,
+ String targetEndpointAddress, String multiplexer, String message) {
// TODO Auto-generated method stub
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
index 2189c7a..60c07c3 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java
@@ -19,14 +19,15 @@
package org.apache.cloudstack.framework.messaging.client;
import org.apache.cloudstack.framework.messaging.TransportEndpoint;
+import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
import org.apache.cloudstack.framework.messaging.TransportProvider;
public class ClientTransportProvider implements TransportProvider {
@Override
- public boolean attach(TransportEndpoint endpoint, String predefinedAddress) {
+ public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
// TODO Auto-generated method stub
- return false;
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/f5295068/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
index 02674cb..3372b75 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
@@ -45,7 +45,7 @@ public class ServerTransportProvider implements TransportProvider {
}
@Override
- public boolean attach(TransportEndpoint endpoint, String predefinedAddress) {
+ public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
TransportAddress transportAddress;
String endpointId;
@@ -62,35 +62,26 @@ public class ServerTransportProvider implements TransportProvider {
endpointSite = _endpointMap.get(endpointId);
if(endpointSite != null) {
// already attached
- return false;
+ return endpointSite;
}
endpointSite = new TransportEndpointSite(endpoint, transportAddress);
_endpointMap.put(endpointId, endpointSite);
}
endpoint.onAttachConfirm(true, transportAddress.toString());
- return true;
+ return endpointSite;
}
@Override
public boolean detach(TransportEndpoint endpoint) {
- TransportAddress transportAddress = TransportAddress.fromAddressString(endpoint.getEndpointAddress());
- if(transportAddress == null)
- return false;
-
- boolean found = false;
synchronized(this) {
- TransportEndpointSite endpointSite = _endpointMap.get(transportAddress.getEndpointId());
- if(endpointSite.getAddress().equals(transportAddress)) {
- found = true;
- _endpointMap.remove(transportAddress.getEndpointId());
+ for(Map.Entry<String, TransportEndpointSite> entry : _endpointMap.entrySet()) {
+ if(entry.getValue().getEndpoint() == endpoint) {
+ _endpointMap.remove(entry.getKey());
+ return true;
+ }
}
}
-
- if(found) {
- endpoint.onDetachIndication(endpoint.getEndpointAddress());
- return true;
- }
return false;
}
@@ -122,6 +113,7 @@ public class ServerTransportProvider implements TransportProvider {
endpointSite.addOutputPdu(pdu);
} else {
// do cross-node forwarding
+ // ???
}
}