You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2012/11/29 03:03:49 UTC
git commit: Finish RPC service server side implementation
Updated Branches:
refs/heads/javelin 1d7506321 -> fc16e1ea1
Finish RPC service server side implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/fc16e1ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/fc16e1ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/fc16e1ea
Branch: refs/heads/javelin
Commit: fc16e1ea1aebb5f9adebcc890971f3c83efb155b
Parents: 1d75063
Author: Kelven Yang <ke...@gmail.com>
Authored: Wed Nov 28 18:03:20 2012 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Wed Nov 28 18:03:20 2012 -0800
----------------------------------------------------------------------
.../framework/messaging/RpcClientCall.java | 2 +
.../framework/messaging/RpcClientCallImpl.java | 34 ++++++--
.../framework/messaging/RpcProviderImpl.java | 43 ++++++++--
.../framework/messaging/RpcServerCallImpl.java | 71 +++++++++++++++
4 files changed, 136 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/fc16e1ea/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
index d2adbfd..1ad03ef 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java
@@ -32,6 +32,8 @@ public interface RpcClientCall {
Object getContextParam(String key);
<T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener);
+ RpcClientCall setCallbackDispatcherTarget(Object target);
+
RpcClientCall setOneway();
void apply();
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/fc16e1ea/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
index 574a273..e244f62 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java
@@ -33,7 +33,8 @@ public class RpcClientCallImpl implements RpcClientCall {
private boolean _oneway = false;
private List<RpcCallbackListener> _callbackListeners = new ArrayList<RpcCallbackListener>();
-
+ private Object _callbackDispatcherTarget;
+
private RpcProvider _rpcProvider;
private long _startTickInMs;
private long _callTag;
@@ -95,6 +96,13 @@ public class RpcClientCallImpl implements RpcClientCall {
_callbackListeners.add(listener);
return this;
}
+
+ @Override
+ public RpcClientCall setCallbackDispatcherTarget(Object target) {
+ _callbackDispatcherTarget = target;
+ return this;
+ }
+
@Override
public RpcClientCall setOneway() {
@@ -189,11 +197,16 @@ public class RpcClientCallImpl implements RpcClientCall {
_responseDone = true;
_responseLock.notifyAll();
}
-
- assert(_rpcProvider.getMessageSerializer() != null);
- Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
- for(RpcCallbackListener listener: _callbackListeners)
- listener.onSuccess(resultObject);
+
+ if(_callbackListeners.size() > 0) {
+ assert(_rpcProvider.getMessageSerializer() != null);
+ Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
+ for(RpcCallbackListener listener: _callbackListeners)
+ listener.onSuccess(resultObject);
+ } else {
+ if(_callbackDispatcherTarget != null)
+ RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
+ }
}
public void complete(RpcException e) {
@@ -205,7 +218,12 @@ public class RpcClientCallImpl implements RpcClientCall {
_responseLock.notifyAll();
}
- for(RpcCallbackListener listener: _callbackListeners)
- listener.onFailure(e);
+ if(_callbackListeners.size() > 0) {
+ for(RpcCallbackListener listener: _callbackListeners)
+ listener.onFailure(e);
+ } else {
+ if(_callbackDispatcherTarget != null)
+ RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/fc16e1ea/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
index c652982..ea69931 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java
@@ -50,9 +50,9 @@ public class RpcProviderImpl implements RpcProvider {
Object pdu = _messageSerializer.serializeFrom(message);
if(pdu instanceof RpcCallRequestPdu) {
- handleCallRequestPdu((RpcCallRequestPdu)pdu);
+ handleCallRequestPdu(senderEndpointAddress, targetEndpointAddress, (RpcCallRequestPdu)pdu);
} else if(pdu instanceof RpcCallResponsePdu) {
- handleCallResponsePdu((RpcCallResponsePdu)pdu);
+ handleCallResponsePdu(senderEndpointAddress, targetEndpointAddress, (RpcCallResponsePdu)pdu);
} else {
assert(false);
}
@@ -81,7 +81,7 @@ public class RpcProviderImpl implements RpcProvider {
_serviceEndpoints.remove(rpcEndpoint);
}
}
-
+
@Override
public RpcClientCall newCall(String sourceAddress, String targetAddress) {
long callTag = getNextCallTag();
@@ -124,11 +124,42 @@ public class RpcProviderImpl implements RpcProvider {
return tag;
}
- private void handleCallRequestPdu(RpcCallRequestPdu pdu) {
- // ???
+ private void handleCallRequestPdu(String sourceAddress, String targetAddress, RpcCallRequestPdu pdu) {
+ try {
+ RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu);
+
+ // TODO, we are trying to avoid locking when calling into callbacks
+ // this can be optimized later
+ List<RpcServiceEndpoint> endpoints = new ArrayList<RpcServiceEndpoint>();
+ synchronized(_serviceEndpoints) {
+ endpoints.addAll(_serviceEndpoints);
+ }
+
+ for(RpcServiceEndpoint endpoint : endpoints) {
+ if(RpcServiceDispatcher.dispatch(endpoint, call))
+ return;
+ }
+
+ RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
+ responsePdu.setCommand(pdu.getCommand());
+ responsePdu.setRequestStartTick(pdu.getRequestStartTick());
+ responsePdu.setRequestTag(pdu.getRequestTag());
+ responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST);
+ sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
+
+ } catch (Throwable e) {
+
+ RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
+ responsePdu.setCommand(pdu.getCommand());
+ responsePdu.setRequestStartTick(pdu.getRequestStartTick());
+ responsePdu.setRequestTag(pdu.getRequestTag());
+ responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION);
+
+ sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
+ }
}
- private void handleCallResponsePdu(RpcCallResponsePdu pdu) {
+ private void handleCallResponsePdu(String sourceAddress, String targetAddress, RpcCallResponsePdu pdu) {
RpcClientCallImpl call = null;
synchronized(this) {
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/fc16e1ea/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
new file mode 100644
index 0000000..75f521f
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.messaging;
+
+public class RpcServerCallImpl implements RpcServerCall {
+
+ private RpcProvider _rpcProvider;
+ private String _sourceAddress;
+ private String _targetAddress;
+
+ private RpcCallRequestPdu _requestPdu;
+
+ public RpcServerCallImpl(RpcProvider provider, String sourceAddress, String targetAddress,
+ RpcCallRequestPdu requestPdu) {
+
+ _rpcProvider = provider;
+ _sourceAddress = sourceAddress;
+ _targetAddress = targetAddress;
+ _requestPdu = requestPdu;
+ }
+
+ @Override
+ public String getCommand() {
+ assert(_requestPdu != null);
+ return _requestPdu.getCommand();
+ }
+
+ @Override
+ public Object getCommandArgument() {
+ if(_requestPdu.getSerializedCommandArg() == null)
+ return null;
+
+ assert(_rpcProvider.getMessageSerializer() != null);
+ return _rpcProvider.getMessageSerializer().serializeFrom(_requestPdu.getSerializedCommandArg());
+ }
+
+ @Override
+ public void completeCall(Object returnObject) {
+ assert(_sourceAddress != null);
+ assert(_targetAddress != null);
+
+ RpcCallResponsePdu pdu = new RpcCallResponsePdu();
+ pdu.setCommand(_requestPdu.getCommand());
+ pdu.setRequestTag(_requestPdu.getRequestTag());
+ pdu.setRequestStartTick(_requestPdu.getRequestStartTick());
+ pdu.setRequestStartTick(RpcCallResponsePdu.RESULT_SUCCESSFUL);
+ if(returnObject != null) {
+ assert(_rpcProvider.getMessageSerializer() != null);
+ pdu.setSerializedResult(_rpcProvider.getMessageSerializer().serializeTo(returnObject.getClass(), returnObject));
+ }
+
+ _rpcProvider.sendRpcPdu(_targetAddress, _sourceAddress,
+ _rpcProvider.getMessageSerializer().serializeTo(RpcCallResponsePdu.class, pdu));
+ }
+}