You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2012/12/14 04:32:01 UTC
[3/4] Refactor and finalize framework IPC java package structure
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java
deleted file mode 100644
index ff02cb8..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging.server;
-
-import org.apache.cloudstack.framework.messaging.EventBusBase;
-import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
-
-public class ServerEventBus extends EventBusBase implements TransportMultiplexier {
-
- @Override
- public void onTransportMessage(String senderEndpointAddress,
- String targetEndpointAddress, String multiplexer, String message) {
- // TODO Auto-generated method stub
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
deleted file mode 100644
index 98177d6..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging.server;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.cloudstack.framework.messaging.MessageSerializer;
-import org.apache.cloudstack.framework.messaging.TransportAddress;
-import org.apache.cloudstack.framework.messaging.TransportDataPdu;
-import org.apache.cloudstack.framework.messaging.TransportEndpoint;
-import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
-import org.apache.cloudstack.framework.messaging.TransportPdu;
-import org.apache.cloudstack.framework.messaging.TransportProvider;
-import org.apache.log4j.Logger;
-
-import com.cloud.utils.concurrency.NamedThreadFactory;
-
-public class ServerTransportProvider implements TransportProvider {
- private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class);
-
- public static final int DEFAULT_WORKER_POOL_SIZE = 5;
-
- private String _nodeId;
-
- private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>();
- private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
- private ExecutorService _executor;
-
- private int _nextEndpointId = new Random().nextInt();
-
- private MessageSerializer _messageSerializer;
-
- public ServerTransportProvider() {
- }
-
- public String getNodeId() {
- return _nodeId;
- }
-
- public ServerTransportProvider setNodeId(String nodeId) {
- _nodeId = nodeId;
- return this;
- }
-
- public int getWorkerPoolSize() {
- return _poolSize;
- }
-
- public ServerTransportProvider setWorkerPoolSize(int poolSize) {
- assert(poolSize > 0);
-
- _poolSize = poolSize;
- return this;
- }
-
- @Override
- public void setMessageSerializer(MessageSerializer messageSerializer) {
- assert(messageSerializer != null);
- _messageSerializer = messageSerializer;
- }
-
- @Override
- public MessageSerializer getMessageSerializer() {
- return _messageSerializer;
- }
-
- public void initialize() {
- _executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
- }
-
- @Override
- public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
-
- TransportAddress transportAddress;
- String endpointId;
- if(predefinedAddress != null && !predefinedAddress.isEmpty()) {
- endpointId = predefinedAddress;
- transportAddress = new TransportAddress(_nodeId, endpointId, 0);
- } else {
- endpointId = String.valueOf(getNextEndpointId());
- transportAddress = new TransportAddress(_nodeId, endpointId);
- }
-
- TransportEndpointSite endpointSite;
- synchronized(this) {
- endpointSite = _endpointMap.get(endpointId);
- if(endpointSite != null) {
- // already attached
- return endpointSite;
- }
- endpointSite = new TransportEndpointSite(this, endpoint, transportAddress);
- _endpointMap.put(endpointId, endpointSite);
- }
-
- endpoint.onAttachConfirm(true, transportAddress.toString());
- return endpointSite;
- }
-
- @Override
- public boolean detach(TransportEndpoint endpoint) {
- synchronized(this) {
- for(Map.Entry<String, TransportEndpointSite> entry : _endpointMap.entrySet()) {
- if(entry.getValue().getEndpoint() == endpoint) {
- _endpointMap.remove(entry.getKey());
- return true;
- }
- }
- }
-
- return false;
- }
-
- @Override
- public void requestSiteOutput(final TransportEndpointSite site) {
- _executor.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- site.processOutput();
- site.ackOutputProcessSignal();
- } catch(Throwable e) {
- s_logger.error("Unhandled exception", e);
- }
- }
- });
- }
-
- @Override
- public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress,
- String multiplexier, String message) {
-
- TransportDataPdu pdu = new TransportDataPdu();
- pdu.setSourceAddress(sourceEndpointAddress);
- pdu.setDestAddress(targetEndpointAddress);
- pdu.setMultiplexier(multiplexier);
- pdu.setContent(message);
-
- dispatchPdu(pdu);
- }
-
- private void dispatchPdu(TransportPdu pdu) {
-
- TransportAddress transportAddress = TransportAddress.fromAddressString(pdu.getDestAddress());
-
- if(isLocalAddress(transportAddress)) {
- TransportEndpointSite endpointSite = null;
- synchronized(this) {
- endpointSite = _endpointMap.get(transportAddress.getEndpointId());
- }
-
- if(endpointSite != null)
- endpointSite.addOutputPdu(pdu);
- } else {
- // do cross-node forwarding
- // ???
- }
- }
-
- private boolean isLocalAddress(TransportAddress address) {
- if(address.getNodeId().equals(_nodeId) || address.getNodeId().equals(TransportAddress.LOCAL_SERVICE_NODE))
- return true;
-
- return false;
- }
-
- private synchronized int getNextEndpointId() {
- return _nextEndpointId++;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallRequestPdu.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallRequestPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallRequestPdu.java
new file mode 100644
index 0000000..b85316e
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallRequestPdu.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import org.apache.cloudstack.framework.serializer.OnwireName;
+
+@OnwireName(name="RpcRequest")
+public class RpcCallRequestPdu {
+
+ private long requestTag;
+ private long requestStartTick;
+
+ private String command;
+ private String serializedCommandArg;
+
+ public RpcCallRequestPdu() {
+ requestTag = 0;
+ requestStartTick = System.currentTimeMillis();
+ }
+
+ public long getRequestTag() {
+ return requestTag;
+ }
+
+ public void setRequestTag(long requestTag) {
+ this.requestTag = requestTag;
+ }
+
+ public long getRequestStartTick() {
+ return requestStartTick;
+ }
+
+ public void setRequestStartTick(long requestStartTick) {
+ this.requestStartTick = requestStartTick;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public String getSerializedCommandArg() {
+ return serializedCommandArg;
+ }
+
+ public void setSerializedCommandArg(String serializedCommandArg) {
+ this.serializedCommandArg = serializedCommandArg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallResponsePdu.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallResponsePdu.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallResponsePdu.java
new file mode 100644
index 0000000..f6cd0a0
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallResponsePdu.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import org.apache.cloudstack.framework.serializer.OnwireName;
+
+@OnwireName(name="RpcResponse")
+public class RpcCallResponsePdu {
+ public static final int RESULT_SUCCESSFUL = 0;
+ public static final int RESULT_HANDLER_NOT_EXIST = 1;
+ public static final int RESULT_HANDLER_EXCEPTION = 2;
+
+ private long requestTag;
+ private long requestStartTick;
+
+ private int result;
+ private String command;
+ private String serializedResult;
+
+ public RpcCallResponsePdu() {
+ requestTag = 0;
+ requestStartTick = 0;
+ }
+
+ public long getRequestTag() {
+ return requestTag;
+ }
+
+ public void setRequestTag(long requestTag) {
+ this.requestTag = requestTag;
+ }
+
+ public long getRequestStartTick() {
+ return requestStartTick;
+ }
+
+ public void setRequestStartTick(long requestStartTick) {
+ this.requestStartTick = requestStartTick;
+ }
+
+ public int getResult() {
+ return result;
+ }
+
+ public void setResult(int result) {
+ this.result = result;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public String getSerializedResult() {
+ return serializedResult;
+ }
+
+ public void setSerializedResult(String serializedResult) {
+ this.serializedResult = serializedResult;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackDispatcher.java
new file mode 100644
index 0000000..c787e6a
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackDispatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class RpcCallbackDispatcher {
+
+ private static Map<Class<?>, Map<String, Method>> s_handlerCache = new HashMap<Class<?>, Map<String, Method>>();
+
+ public static boolean dispatch(Object target, RpcClientCall clientCall) {
+ assert(clientCall != null);
+ assert(target != null);
+
+ Method handler = resolveHandler(target.getClass(), clientCall.getCommand());
+ if(handler == null)
+ return false;
+
+ try {
+ handler.invoke(target, clientCall);
+ } catch (IllegalArgumentException e) {
+ throw new RpcException("IllegalArgumentException when invoking RPC callback for command: " + clientCall.getCommand());
+ } catch (IllegalAccessException e) {
+ throw new RpcException("IllegalAccessException when invoking RPC callback for command: " + clientCall.getCommand());
+ } catch (InvocationTargetException e) {
+ throw new RpcException("InvocationTargetException when invoking RPC callback for command: " + clientCall.getCommand());
+ }
+
+ return true;
+ }
+
+ public static Method resolveHandler(Class<?> handlerClz, String command) {
+ synchronized(s_handlerCache) {
+ Map<String, Method> handlerMap = getAndSetHandlerMap(handlerClz);
+
+ Method handler = handlerMap.get(command);
+ if(handler != null)
+ return handler;
+
+ for(Method method : handlerClz.getDeclaredMethods()) {
+ RpcCallbackHandler annotation = method.getAnnotation(RpcCallbackHandler.class);
+ if(annotation != null) {
+ if(annotation.command().equals(command)) {
+ method.setAccessible(true);
+ handlerMap.put(command, method);
+ return method;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private static Map<String, Method> getAndSetHandlerMap(Class<?> handlerClz) {
+ Map<String, Method> handlerMap;
+ synchronized(s_handlerCache) {
+ handlerMap = s_handlerCache.get(handlerClz);
+
+ if(handlerMap == null) {
+ handlerMap = new HashMap<String, Method>();
+ s_handlerCache.put(handlerClz, handlerMap);
+ }
+ }
+
+ return handlerMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackHandler.java
new file mode 100644
index 0000000..86dfceb
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface RpcCallbackHandler {
+ String command();
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackListener.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackListener.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackListener.java
new file mode 100644
index 0000000..0ab94ac
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcCallbackListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public interface RpcCallbackListener<T> {
+ void onSuccess(T result);
+ void onFailure(RpcException e);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCall.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCall.java
new file mode 100644
index 0000000..7a7e45c
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCall.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public interface RpcClientCall {
+ final static int DEFAULT_RPC_TIMEOUT = 10000;
+
+ String getCommand();
+ RpcClientCall setCommand(String cmd);
+ RpcClientCall setTimeout(int timeoutMilliseconds);
+
+ RpcClientCall setCommandArg(Object arg);
+ Object getCommandArg();
+
+ RpcClientCall setContextParam(String key, Object param);
+ <T> T getContextParam(String key);
+
+ <T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener);
+ RpcClientCall setCallbackDispatcherTarget(Object target);
+
+ RpcClientCall setOneway();
+
+ RpcClientCall apply();
+ void cancel();
+
+ /**
+ * @return the result object, it may also throw RpcException to indicate RPC failures
+ */
+ <T> T get();
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCallImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCallImpl.java
new file mode 100644
index 0000000..1db878d
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcClientCallImpl.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class RpcClientCallImpl implements RpcClientCall {
+
+ private String _command;
+ private Object _commandArg;
+
+ private int _timeoutMilliseconds = DEFAULT_RPC_TIMEOUT;
+ private Map<String, Object> _contextParams = new HashMap<String, Object>();
+ private boolean _oneway = false;
+
+ @SuppressWarnings("rawtypes")
+ private List<RpcCallbackListener> _callbackListeners = new ArrayList<RpcCallbackListener>();
+ private Object _callbackDispatcherTarget;
+
+ private RpcProvider _rpcProvider;
+ private long _startTickInMs;
+ private long _callTag;
+ private String _sourceAddress;
+ private String _targetAddress;
+
+ private Object _responseLock = new Object();
+ private boolean _responseDone = false;;
+ private Object _responseResult;
+
+ public RpcClientCallImpl(RpcProvider rpcProvider) {
+ assert(rpcProvider != null);
+ _rpcProvider = rpcProvider;
+ }
+
+ @Override
+ public String getCommand() {
+ return _command;
+ }
+
+ @Override
+ public RpcClientCall setCommand(String cmd) {
+ _command = cmd;
+ return this;
+ }
+
+ @Override
+ public RpcClientCall setTimeout(int timeoutMilliseconds) {
+ _timeoutMilliseconds = timeoutMilliseconds;
+ return this;
+ }
+
+ @Override
+ public RpcClientCall setCommandArg(Object arg) {
+ _commandArg = arg;
+ return this;
+ }
+
+ @Override
+ public Object getCommandArg() {
+ return _commandArg;
+ }
+
+ @Override
+ public RpcClientCall setContextParam(String key, Object param) {
+ assert(key != null);
+ _contextParams.put(key, param);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T getContextParam(String key) {
+ return (T)_contextParams.get(key);
+ }
+
+ @Override
+ public <T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener) {
+ assert(listener != null);
+ _callbackListeners.add(listener);
+ return this;
+ }
+
+ @Override
+ public RpcClientCall setCallbackDispatcherTarget(Object target) {
+ _callbackDispatcherTarget = target;
+ return this;
+ }
+
+
+ @Override
+ public RpcClientCall setOneway() {
+ _oneway = true;
+ return this;
+ }
+
+ public String getSourceAddress() {
+ return _sourceAddress;
+ }
+
+ public void setSourceAddress(String sourceAddress) {
+ _sourceAddress = sourceAddress;
+ }
+
+ public String getTargetAddress() {
+ return _targetAddress;
+ }
+
+ public void setTargetAddress(String targetAddress) {
+ _targetAddress = targetAddress;
+ }
+
+ public long getCallTag() {
+ return _callTag;
+ }
+
+ public void setCallTag(long callTag) {
+ _callTag = callTag;
+ }
+
+ @Override
+ public RpcClientCall apply() {
+ // sanity check
+ assert(_sourceAddress != null);
+ assert(_targetAddress != null);
+
+ if(!_oneway)
+ _rpcProvider.registerCall(this);
+
+ RpcCallRequestPdu pdu = new RpcCallRequestPdu();
+ pdu.setCommand(getCommand());
+ if(_commandArg != null)
+ pdu.setSerializedCommandArg(_rpcProvider.getMessageSerializer().serializeTo(_commandArg.getClass(), _commandArg));
+ pdu.setRequestTag(this.getCallTag());
+
+ _rpcProvider.sendRpcPdu(getSourceAddress(), getTargetAddress(),
+ _rpcProvider.getMessageSerializer().serializeTo(RpcCallRequestPdu.class, pdu));
+
+ return this;
+ }
+
+ @Override
+ public void cancel() {
+ _rpcProvider.cancelCall(this);
+ }
+
+ @Override
+ public <T> T get() {
+ if(!_oneway) {
+ synchronized(_responseLock) {
+ if(!_responseDone) {
+ long timeToWait = _timeoutMilliseconds - (System.currentTimeMillis() - _startTickInMs);
+ if(timeToWait < 0)
+ timeToWait = 0;
+
+ try {
+ _responseLock.wait(timeToWait);
+ } catch (InterruptedException e) {
+ throw new RpcTimeoutException("RPC call timed out");
+ }
+ }
+
+ assert(_responseDone);
+
+ if(_responseResult == null)
+ return null;
+
+ if(_responseResult instanceof RpcException)
+ throw (RpcException)_responseResult;
+
+ assert(_rpcProvider.getMessageSerializer() != null);
+ assert(_responseResult instanceof String);
+ return _rpcProvider.getMessageSerializer().serializeFrom((String)_responseResult);
+ }
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void complete(String result) {
+ _responseResult = result;
+
+ synchronized(_responseLock) {
+ _responseDone = true;
+ _responseLock.notifyAll();
+ }
+
+ if(_callbackListeners.size() > 0) {
+ assert(_rpcProvider.getMessageSerializer() != null);
+ Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
+ for(@SuppressWarnings("rawtypes") RpcCallbackListener listener: _callbackListeners)
+ listener.onSuccess(resultObject);
+ } else {
+ if(_callbackDispatcherTarget != null)
+ RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
+ }
+ }
+
+ public void complete(RpcException e) {
+ _responseResult = e;
+
+ synchronized(_responseLock) {
+ _responseDone = true;
+
+ _responseLock.notifyAll();
+ }
+
+ if(_callbackListeners.size() > 0) {
+ for(@SuppressWarnings("rawtypes") RpcCallbackListener listener: _callbackListeners)
+ listener.onFailure(e);
+ } else {
+ if(_callbackDispatcherTarget != null)
+ RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcException.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcException.java
new file mode 100644
index 0000000..618e6ab
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+public class RpcException extends RuntimeException {
+ private static final long serialVersionUID = -3164514701087423787L;
+
+ public RpcException() {
+ super();
+ }
+
+ public RpcException(String message) {
+ super(message);
+ }
+
+ public RpcException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcIOException.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcIOException.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcIOException.java
new file mode 100644
index 0000000..8479e38
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcIOException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public class RpcIOException extends RpcException {
+
+ private static final long serialVersionUID = -6108039302920641533L;
+
+ public RpcIOException() {
+ super();
+ }
+
+ public RpcIOException(String message) {
+ super(message);
+ }
+
+ public RpcIOException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProvider.java
new file mode 100644
index 0000000..fb4f04b
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+import org.apache.cloudstack.framework.transport.TransportAddressMapper;
+import org.apache.cloudstack.framework.transport.TransportMultiplexier;
+
+public interface RpcProvider extends TransportMultiplexier {
+ final static String RPC_MULTIPLEXIER = "rpc";
+
+ void setMessageSerializer(MessageSerializer messageSerializer);
+ MessageSerializer getMessageSerializer();
+ boolean initialize();
+
+ void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
+ void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
+
+ RpcClientCall newCall();
+ RpcClientCall newCall(String targetAddress);
+ RpcClientCall newCall(TransportAddressMapper targetAddress);
+
+ //
+ // low-level public API
+ //
+ void registerCall(RpcClientCall call);
+ void cancelCall(RpcClientCall call);
+
+ void sendRpcPdu(String sourceAddress, String targetAddress, String serializedPdu);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java
new file mode 100644
index 0000000..7f73e60
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcProviderImpl.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+import org.apache.cloudstack.framework.transport.TransportAddress;
+import org.apache.cloudstack.framework.transport.TransportAddressMapper;
+import org.apache.cloudstack.framework.transport.TransportEndpoint;
+import org.apache.cloudstack.framework.transport.TransportEndpointSite;
+import org.apache.cloudstack.framework.transport.TransportProvider;
+
+public class RpcProviderImpl implements RpcProvider {
+ public static final String RPC_MULTIPLEXIER = "rpc";
+
+ private TransportProvider _transportProvider;
+ private String _transportAddress;
+ private RpcTransportEndpoint _transportEndpoint = new RpcTransportEndpoint(); // transport attachment at RPC layer
+
+ private MessageSerializer _messageSerializer;
+ private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>();
+ private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>();
+
+ private long _nextCallTag = System.currentTimeMillis();
+
+ public RpcProviderImpl() {
+ }
+
+ public RpcProviderImpl(TransportProvider transportProvider) {
+ _transportProvider = transportProvider;
+ }
+
+ public TransportProvider getTransportProvider() {
+ return _transportProvider;
+ }
+
+ public void setTransportProvider(TransportProvider transportProvider) {
+ _transportProvider = transportProvider;
+ }
+
+ @Override
+ public void onTransportMessage(String senderEndpointAddress,
+ String targetEndpointAddress, String multiplexer, String message) {
+ assert(_messageSerializer != null);
+
+ Object pdu = _messageSerializer.serializeFrom(message);
+ if(pdu instanceof RpcCallRequestPdu) {
+ handleCallRequestPdu(senderEndpointAddress, targetEndpointAddress, (RpcCallRequestPdu)pdu);
+ } else if(pdu instanceof RpcCallResponsePdu) {
+ handleCallResponsePdu(senderEndpointAddress, targetEndpointAddress, (RpcCallResponsePdu)pdu);
+ } else {
+ assert(false);
+ }
+ }
+
+ @Override
+ public void setMessageSerializer(MessageSerializer messageSerializer) {
+ assert(messageSerializer != null);
+ _messageSerializer = messageSerializer;
+ }
+
+ @Override
+ public MessageSerializer getMessageSerializer() {
+ return _messageSerializer;
+ }
+
+ @Override
+ public boolean initialize() {
+ assert(_transportProvider != null);
+ if(_transportProvider == null)
+ return false;
+
+ TransportEndpointSite endpointSite = _transportProvider.attach(_transportEndpoint, "RpcProvider");
+ endpointSite.registerMultiplexier(RPC_MULTIPLEXIER, this);
+ return true;
+ }
+
+ @Override
+ public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
+ synchronized(_serviceEndpoints) {
+ _serviceEndpoints.add(rpcEndpoint);
+ }
+ }
+
+ @Override
+ public void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
+ synchronized(_serviceEndpoints) {
+ _serviceEndpoints.remove(rpcEndpoint);
+ }
+ }
+
+ @Override
+ public RpcClientCall newCall() {
+ return newCall(TransportAddress.getLocalPredefinedTransportAddress("RpcProvider").toString());
+ }
+
+ @Override
+ public RpcClientCall newCall(String targetAddress) {
+
+ long callTag = getNextCallTag();
+ RpcClientCallImpl call = new RpcClientCallImpl(this);
+ call.setSourceAddress(_transportAddress);
+ call.setTargetAddress(targetAddress);
+ call.setCallTag(callTag);
+
+ return call;
+ }
+
+ @Override
+ public RpcClientCall newCall(TransportAddressMapper targetAddress) {
+ return newCall(targetAddress.getAddress());
+ }
+
+ @Override
+ public void registerCall(RpcClientCall call) {
+ assert(call != null);
+ synchronized(this) {
+ _outstandingCalls.put(((RpcClientCallImpl)call).getCallTag(), call);
+ }
+ }
+
+ @Override
+ public void cancelCall(RpcClientCall call) {
+ synchronized(this) {
+ _outstandingCalls.remove(((RpcClientCallImpl)call).getCallTag());
+ }
+
+ ((RpcClientCallImpl)call).complete(new RpcException("Call is cancelled"));
+ }
+
+ @Override
+ public void sendRpcPdu(String sourceAddress, String targetAddress, String serializedPdu) {
+ assert(_transportProvider != null);
+ _transportProvider.sendMessage(sourceAddress, targetAddress, RpcProvider.RPC_MULTIPLEXIER, serializedPdu);
+ }
+
+ protected synchronized long getNextCallTag() {
+ long tag = _nextCallTag++;
+ if(tag == 0)
+ tag++;
+
+ return tag;
+ }
+
+ private void handleCallRequestPdu(String sourceAddress, String targetAddress, RpcCallRequestPdu pdu) {
+ try {
+ RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu);
+
+ // TODO, we are trying to avoid locking when calling into callbacks
+ // this can be optimized later
+ List<RpcServiceEndpoint> endpoints = new ArrayList<RpcServiceEndpoint>();
+ synchronized(_serviceEndpoints) {
+ endpoints.addAll(_serviceEndpoints);
+ }
+
+ for(RpcServiceEndpoint endpoint : endpoints) {
+ if(endpoint.onCallReceive(call))
+ return;
+ }
+
+ RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
+ responsePdu.setCommand(pdu.getCommand());
+ responsePdu.setRequestStartTick(pdu.getRequestStartTick());
+ responsePdu.setRequestTag(pdu.getRequestTag());
+ responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST);
+ sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
+
+ } catch (Throwable e) {
+
+ RpcCallResponsePdu responsePdu = new RpcCallResponsePdu();
+ responsePdu.setCommand(pdu.getCommand());
+ responsePdu.setRequestStartTick(pdu.getRequestStartTick());
+ responsePdu.setRequestTag(pdu.getRequestTag());
+ responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION);
+
+ sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu));
+ }
+ }
+
+ private void handleCallResponsePdu(String sourceAddress, String targetAddress, RpcCallResponsePdu pdu) {
+ RpcClientCallImpl call = null;
+
+ synchronized(this) {
+ call = (RpcClientCallImpl)_outstandingCalls.remove(pdu.getRequestTag());
+ }
+
+ if(call != null) {
+ switch(pdu.getResult()) {
+ case RpcCallResponsePdu.RESULT_SUCCESSFUL :
+ call.complete(pdu.getSerializedResult());
+ break;
+
+ case RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST :
+ call.complete(new RpcException("Handler does not exist"));
+ break;
+
+ case RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION :
+ call.complete(new RpcException("Exception in handler"));
+ break;
+
+ default :
+ assert(false);
+ break;
+ }
+ }
+ }
+
+ private class RpcTransportEndpoint implements TransportEndpoint {
+
+ @Override
+ public void onTransportMessage(String senderEndpointAddress,
+ String targetEndpointAddress, String multiplexer, String message) {
+
+ // we won't handle generic transport message toward RPC transport endpoint
+ }
+
+ @Override
+ public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
+ if(bSuccess)
+ _transportAddress = endpointAddress;
+
+ }
+
+ @Override
+ public void onDetachIndication(String endpointAddress) {
+ if(_transportAddress != null && _transportAddress.equals(endpointAddress))
+ _transportAddress = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCall.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCall.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCall.java
new file mode 100644
index 0000000..a102503
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCall.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+public interface RpcServerCall {
+ String getCommand();
+ <T> T getCommandArgument();
+
+ // for receiver to response call
+ void completeCall(Object returnObject);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCallImpl.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCallImpl.java
new file mode 100644
index 0000000..d1ac7a9
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServerCallImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public class RpcServerCallImpl implements RpcServerCall {
+
+ private RpcProvider _rpcProvider;
+ private String _sourceAddress;
+ private String _targetAddress;
+
+ private RpcCallRequestPdu _requestPdu;
+
+ public RpcServerCallImpl(RpcProvider provider, String sourceAddress, String targetAddress,
+ RpcCallRequestPdu requestPdu) {
+
+ _rpcProvider = provider;
+ _sourceAddress = sourceAddress;
+ _targetAddress = targetAddress;
+ _requestPdu = requestPdu;
+ }
+
+ @Override
+ public String getCommand() {
+ assert(_requestPdu != null);
+ return _requestPdu.getCommand();
+ }
+
+ @Override
+ public <T> T getCommandArgument() {
+ if(_requestPdu.getSerializedCommandArg() == null)
+ return null;
+
+ assert(_rpcProvider.getMessageSerializer() != null);
+ return _rpcProvider.getMessageSerializer().serializeFrom(_requestPdu.getSerializedCommandArg());
+ }
+
+ @Override
+ public void completeCall(Object returnObject) {
+ assert(_sourceAddress != null);
+ assert(_targetAddress != null);
+
+ RpcCallResponsePdu pdu = new RpcCallResponsePdu();
+ pdu.setCommand(_requestPdu.getCommand());
+ pdu.setRequestTag(_requestPdu.getRequestTag());
+ pdu.setRequestStartTick(_requestPdu.getRequestStartTick());
+ pdu.setRequestStartTick(RpcCallResponsePdu.RESULT_SUCCESSFUL);
+ if(returnObject != null) {
+ assert(_rpcProvider.getMessageSerializer() != null);
+ pdu.setSerializedResult(_rpcProvider.getMessageSerializer().serializeTo(returnObject.getClass(), returnObject));
+ }
+
+ _rpcProvider.sendRpcPdu(_targetAddress, _sourceAddress,
+ _rpcProvider.getMessageSerializer().serializeTo(RpcCallResponsePdu.class, pdu));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceDispatcher.java
new file mode 100644
index 0000000..c0d1566
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceDispatcher.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class RpcServiceDispatcher implements RpcServiceEndpoint {
+
+ private static Map<Class<?>, Map<String, Method>> s_handlerCache = new HashMap<Class<?>, Map<String, Method>>();
+
+ private static Map<Object, RpcServiceDispatcher> s_targetMap = new HashMap<Object, RpcServiceDispatcher>();
+ private Object _targetObject;
+
+ public RpcServiceDispatcher(Object targetObject) {
+ _targetObject = targetObject;
+ }
+
+ public static RpcServiceDispatcher getDispatcher(Object targetObject) {
+ RpcServiceDispatcher dispatcher;
+ synchronized(s_targetMap) {
+ dispatcher = s_targetMap.get(targetObject);
+ if(dispatcher == null) {
+ dispatcher = new RpcServiceDispatcher(targetObject);
+ s_targetMap.put(targetObject, dispatcher);
+ }
+ }
+ return dispatcher;
+ }
+
+ public static void removeDispatcher(Object targetObject) {
+ synchronized(s_targetMap) {
+ s_targetMap.remove(targetObject);
+ }
+ }
+
+ public static boolean dispatch(Object target, RpcServerCall serviceCall) {
+ assert(serviceCall != null);
+ assert(target != null);
+
+ Method handler = resolveHandler(target.getClass(), serviceCall.getCommand());
+ if(handler == null)
+ return false;
+
+ try {
+ handler.invoke(target, serviceCall);
+ } catch (IllegalArgumentException e) {
+ throw new RpcException("IllegalArgumentException when invoking RPC service command: " + serviceCall.getCommand());
+ } catch (IllegalAccessException e) {
+ throw new RpcException("IllegalAccessException when invoking RPC service command: " + serviceCall.getCommand());
+ } catch (InvocationTargetException e) {
+ throw new RpcException("InvocationTargetException when invoking RPC service command: " + serviceCall.getCommand());
+ }
+
+ return true;
+ }
+
+ public static Method resolveHandler(Class<?> handlerClz, String command) {
+ synchronized(s_handlerCache) {
+ Map<String, Method> handlerMap = getAndSetHandlerMap(handlerClz);
+
+ Method handler = handlerMap.get(command);
+ if(handler != null)
+ return handler;
+
+ for(Method method : handlerClz.getDeclaredMethods()) {
+ RpcServiceHandler annotation = method.getAnnotation(RpcServiceHandler.class);
+ if(annotation != null) {
+ if(annotation.command().equals(command)) {
+ method.setAccessible(true);
+ handlerMap.put(command, method);
+ return method;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private static Map<String, Method> getAndSetHandlerMap(Class<?> handlerClz) {
+ Map<String, Method> handlerMap;
+ synchronized(s_handlerCache) {
+ handlerMap = s_handlerCache.get(handlerClz);
+
+ if(handlerMap == null) {
+ handlerMap = new HashMap<String, Method>();
+ s_handlerCache.put(handlerClz, handlerMap);
+ }
+ }
+
+ return handlerMap;
+ }
+
+ @Override
+ public boolean onCallReceive(RpcServerCall call) {
+ return dispatch(_targetObject, call);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceEndpoint.java
new file mode 100644
index 0000000..31dc083
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceEndpoint.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public interface RpcServiceEndpoint {
+ /*
+ * @return
+ * true call has been handled
+ * false can not find the call handler
+ * @throws
+ * RpcException, exception when
+ */
+ boolean onCallReceive(RpcServerCall call);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceHandler.java
new file mode 100644
index 0000000..6a77f93
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcServiceHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface RpcServiceHandler {
+ String command();
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcTimeoutException.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcTimeoutException.java b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcTimeoutException.java
new file mode 100644
index 0000000..5c876c7
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/rpc/RpcTimeoutException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.rpc;
+
+
+public class RpcTimeoutException extends RpcException {
+
+ private static final long serialVersionUID = -3618654987984665833L;
+
+ public RpcTimeoutException() {
+ super();
+ }
+
+ public RpcTimeoutException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/serializer/JsonMessageSerializer.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/serializer/JsonMessageSerializer.java b/framework/ipc/src/org/apache/cloudstack/framework/serializer/JsonMessageSerializer.java
new file mode 100644
index 0000000..2fcab54
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/serializer/JsonMessageSerializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.serializer;
+
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class JsonMessageSerializer implements MessageSerializer {
+
+ // this will be injected from external to allow installation of
+ // type adapters needed by upper layer applications
+ private Gson _gson;
+
+ private OnwireClassRegistry _clzRegistry;
+
+ public JsonMessageSerializer() {
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ gsonBuilder.setVersion(1.5);
+ _gson = gsonBuilder.create();
+ }
+
+ public Gson getGson() {
+ return _gson;
+ }
+
+ public void setGson(Gson gson) {
+ _gson = gson;
+ }
+
+ public OnwireClassRegistry getOnwireClassRegistry() {
+ return _clzRegistry;
+ }
+
+ public void setOnwireClassRegistry(OnwireClassRegistry clzRegistry) {
+ _clzRegistry = clzRegistry;
+ }
+
+ @Override
+ public <T> String serializeTo(Class<?> clz, T object) {
+ assert(clz != null);
+ assert(object != null);
+
+ StringBuffer sbuf = new StringBuffer();
+
+ OnwireName onwire = clz.getAnnotation(OnwireName.class);
+ if(onwire == null)
+ throw new RuntimeException("Class " + clz.getCanonicalName() + " is not declared to be onwire");
+
+ sbuf.append(onwire.name()).append("|");
+ sbuf.append(_gson.toJson(object));
+
+ return sbuf.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T serializeFrom(String message) {
+ assert(message != null);
+ int contentStartPos = message.indexOf('|');
+ if(contentStartPos < 0)
+ throw new RuntimeException("Invalid on-wire message format");
+
+ String onwireName = message.substring(0, contentStartPos);
+ Class<?> clz = _clzRegistry.getOnwireClass(onwireName);
+ if(clz == null)
+ throw new RuntimeException("Onwire class is not registered. name: " + onwireName);
+
+ return (T)_gson.fromJson(message.substring(contentStartPos + 1), clz);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/serializer/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/serializer/MessageSerializer.java b/framework/ipc/src/org/apache/cloudstack/framework/serializer/MessageSerializer.java
new file mode 100644
index 0000000..65d818e
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/serializer/MessageSerializer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.serializer;
+
+public interface MessageSerializer {
+ <T>String serializeTo(Class<?> clz, T object);
+ <T> T serializeFrom(String message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireClassRegistry.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireClassRegistry.java b/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireClassRegistry.java
new file mode 100644
index 0000000..ac9c6bc
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireClassRegistry.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.serializer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+
+
+//
+// Finding classes in a given package code is taken and modified from
+// Credit: http://internna.blogspot.com/2007/11/java-5-retrieving-all-classes-from.html
+//
+public class OnwireClassRegistry {
+
+ private List<String> packages = new ArrayList<String>();
+ private Map<String, Class<?>> registry = new HashMap<String, Class<?>>();
+
+ public OnwireClassRegistry() {
+ registry.put("Object", Object.class);
+ }
+
+ public OnwireClassRegistry(String packageName) {
+ addPackage(packageName);
+ }
+
+ public OnwireClassRegistry(List<String> packages) {
+ packages.addAll(packages);
+ }
+
+ public List<String> getPackages() {
+ return packages;
+ }
+
+ public void setPackages(List<String> packages) {
+ this.packages = packages;
+ }
+
+ public void addPackage(String packageName) {
+ packages.add(packageName);
+ }
+
+ public void scan() {
+ Set<Class<?>> classes = new HashSet<Class<?>>();
+ for(String pkg : packages) {
+ classes.addAll(getClasses(pkg));
+ }
+
+ for(Class<?> clz : classes) {
+ OnwireName onwire = clz.getAnnotation(OnwireName.class);
+ if(onwire != null) {
+ assert(onwire.name() != null);
+
+ registry.put(onwire.name(), clz);
+ }
+ }
+ }
+
+ public Class<?> getOnwireClass(String onwireName) {
+ return registry.get(onwireName);
+ }
+
+ static Set<Class<?>> getClasses(String packageName) {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ return getClasses(loader, packageName);
+ }
+
+ //
+ // Following helper methods can be put in a separated helper class,
+ // will do that later
+ //
+ static Set<Class<?>> getClasses(ClassLoader loader, String packageName) {
+ Set<Class<?>> classes = new HashSet<Class<?>>();
+ String path = packageName.replace('.', '/');
+ try {
+ Enumeration<URL> resources = loader.getResources(path);
+ if (resources != null) {
+ while (resources.hasMoreElements()) {
+ String filePath = resources.nextElement().getFile();
+ // WINDOWS HACK
+ if(filePath.indexOf("%20") > 0)
+ filePath = filePath.replaceAll("%20", " ");
+ if (filePath != null) {
+ if ((filePath.indexOf("!") > 0) && (filePath.indexOf(".jar") > 0)) {
+ String jarPath = filePath.substring(0, filePath.indexOf("!"))
+ .substring(filePath.indexOf(":") + 1);
+ // WINDOWS HACK
+ if (jarPath.indexOf(":") >= 0) jarPath = jarPath.substring(1);
+ classes.addAll(getFromJARFile(jarPath, path));
+ } else {
+ classes.addAll(getFromDirectory(new File(filePath), packageName));
+ }
+ }
+ }
+ }
+ } catch(IOException e) {
+ } catch(ClassNotFoundException e) {
+ }
+ return classes;
+ }
+
+ static Set<Class<?>> getFromDirectory(File directory, String packageName) throws ClassNotFoundException {
+ Set<Class<?>> classes = new HashSet<Class<?>>();
+ if (directory.exists()) {
+ for (String file : directory.list()) {
+ if (file.endsWith(".class")) {
+ String name = packageName + '.' + stripFilenameExtension(file);
+ try {
+ Class<?> clazz = Class.forName(name);
+ classes.add(clazz);
+ } catch(ClassNotFoundException e) {
+ } catch(Exception e) {
+ }
+ } else {
+ File f = new File(directory.getPath() + "/" + file);
+ if(f.isDirectory()) {
+ classes.addAll(getFromDirectory(f, packageName + "." + file));
+ }
+ }
+ }
+ }
+ return classes;
+ }
+
+ static Set<Class<?>> getFromJARFile(String jar, String packageName) throws IOException, ClassNotFoundException {
+ Set<Class<?>> classes = new HashSet<Class<?>>();
+ JarInputStream jarFile = new JarInputStream(new FileInputStream(jar));
+ JarEntry jarEntry;
+ do {
+ jarEntry = jarFile.getNextJarEntry();
+ if (jarEntry != null) {
+ String className = jarEntry.getName();
+ if (className.endsWith(".class")) {
+ className = stripFilenameExtension(className);
+ if (className.startsWith(packageName)) {
+ try {
+ Class<?> clz = Class.forName(className.replace('/', '.'));
+ classes.add(clz);
+ } catch(ClassNotFoundException e) {
+ } catch(NoClassDefFoundError e) {
+ }
+ }
+ }
+ }
+ } while (jarEntry != null);
+
+ return classes;
+ }
+
+ static String stripFilenameExtension(String file) {
+ return file.substring(0, file.lastIndexOf('.'));
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireName.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireName.java b/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireName.java
new file mode 100644
index 0000000..ac195d0
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/serializer/OnwireName.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.serializer;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface OnwireName {
+ String name();
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
new file mode 100644
index 0000000..11bc428
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerEventBus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.server;
+
+import org.apache.cloudstack.framework.eventbus.EventBusBase;
+import org.apache.cloudstack.framework.transport.TransportMultiplexier;
+
+public class ServerEventBus extends EventBusBase implements TransportMultiplexier {
+
+ @Override
+ public void onTransportMessage(String senderEndpointAddress,
+ String targetEndpointAddress, String multiplexer, String message) {
+ // TODO Auto-generated method stub
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java
new file mode 100644
index 0000000..d5dae2e
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/server/ServerTransportProvider.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.server;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+import org.apache.cloudstack.framework.transport.TransportAddress;
+import org.apache.cloudstack.framework.transport.TransportDataPdu;
+import org.apache.cloudstack.framework.transport.TransportEndpoint;
+import org.apache.cloudstack.framework.transport.TransportEndpointSite;
+import org.apache.cloudstack.framework.transport.TransportPdu;
+import org.apache.cloudstack.framework.transport.TransportProvider;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.concurrency.NamedThreadFactory;
+
+public class ServerTransportProvider implements TransportProvider {
+ private static final Logger s_logger = Logger.getLogger(ServerTransportProvider.class);
+
+ public static final int DEFAULT_WORKER_POOL_SIZE = 5;
+
+ private String _nodeId;
+
+ private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>();
+ private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
+ private ExecutorService _executor;
+
+ private int _nextEndpointId = new Random().nextInt();
+
+ private MessageSerializer _messageSerializer;
+
+ public ServerTransportProvider() {
+ }
+
+ public String getNodeId() {
+ return _nodeId;
+ }
+
+ public ServerTransportProvider setNodeId(String nodeId) {
+ _nodeId = nodeId;
+ return this;
+ }
+
+ public int getWorkerPoolSize() {
+ return _poolSize;
+ }
+
+ public ServerTransportProvider setWorkerPoolSize(int poolSize) {
+ assert(poolSize > 0);
+
+ _poolSize = poolSize;
+ return this;
+ }
+
+ @Override
+ public void setMessageSerializer(MessageSerializer messageSerializer) {
+ assert(messageSerializer != null);
+ _messageSerializer = messageSerializer;
+ }
+
+ @Override
+ public MessageSerializer getMessageSerializer() {
+ return _messageSerializer;
+ }
+
+ public void initialize() {
+ _executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
+ }
+
+ @Override
+ public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
+
+ TransportAddress transportAddress;
+ String endpointId;
+ if(predefinedAddress != null && !predefinedAddress.isEmpty()) {
+ endpointId = predefinedAddress;
+ transportAddress = new TransportAddress(_nodeId, endpointId, 0);
+ } else {
+ endpointId = String.valueOf(getNextEndpointId());
+ transportAddress = new TransportAddress(_nodeId, endpointId);
+ }
+
+ TransportEndpointSite endpointSite;
+ synchronized(this) {
+ endpointSite = _endpointMap.get(endpointId);
+ if(endpointSite != null) {
+ // already attached
+ return endpointSite;
+ }
+ endpointSite = new TransportEndpointSite(this, endpoint, transportAddress);
+ _endpointMap.put(endpointId, endpointSite);
+ }
+
+ endpoint.onAttachConfirm(true, transportAddress.toString());
+ return endpointSite;
+ }
+
+ @Override
+ public boolean detach(TransportEndpoint endpoint) {
+ synchronized(this) {
+ for(Map.Entry<String, TransportEndpointSite> entry : _endpointMap.entrySet()) {
+ if(entry.getValue().getEndpoint() == endpoint) {
+ _endpointMap.remove(entry.getKey());
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void requestSiteOutput(final TransportEndpointSite site) {
+ _executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ site.processOutput();
+ site.ackOutputProcessSignal();
+ } catch(Throwable e) {
+ s_logger.error("Unhandled exception", e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress,
+ String multiplexier, String message) {
+
+ TransportDataPdu pdu = new TransportDataPdu();
+ pdu.setSourceAddress(sourceEndpointAddress);
+ pdu.setDestAddress(targetEndpointAddress);
+ pdu.setMultiplexier(multiplexier);
+ pdu.setContent(message);
+
+ dispatchPdu(pdu);
+ }
+
+ private void dispatchPdu(TransportPdu pdu) {
+
+ TransportAddress transportAddress = TransportAddress.fromAddressString(pdu.getDestAddress());
+
+ if(isLocalAddress(transportAddress)) {
+ TransportEndpointSite endpointSite = null;
+ synchronized(this) {
+ endpointSite = _endpointMap.get(transportAddress.getEndpointId());
+ }
+
+ if(endpointSite != null)
+ endpointSite.addOutputPdu(pdu);
+ } else {
+ // do cross-node forwarding
+ // ???
+ }
+ }
+
+ private boolean isLocalAddress(TransportAddress address) {
+ if(address.getNodeId().equals(_nodeId) || address.getNodeId().equals(TransportAddress.LOCAL_SERVICE_NODE))
+ return true;
+
+ return false;
+ }
+
+ private synchronized int getNextEndpointId() {
+ return _nextEndpointId++;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java
new file mode 100644
index 0000000..4a5ad79
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddress.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.transport;
+
+import java.util.Random;
+
+public class TransportAddress {
+ public final static String LOCAL_SERVICE_NODE = "";
+
+ private String _nodeId = LOCAL_SERVICE_NODE;
+ private String _endpointId;
+ private int _magic;
+
+ public TransportAddress(String nodeId, String endpointId) {
+ assert(nodeId != null);
+ assert(endpointId != null);
+ assert(nodeId.indexOf(".") < 0);
+ assert(endpointId.indexOf(".") < 0);
+
+ _nodeId = nodeId;
+ _endpointId = endpointId;
+ _magic = new Random().nextInt();
+ }
+
+ public TransportAddress(String nodeId, String endpointId, int magic) {
+ assert(nodeId != null);
+ assert(endpointId != null);
+ assert(nodeId.indexOf(".") < 0);
+ assert(endpointId.indexOf(".") < 0);
+
+ _nodeId = nodeId;
+ _endpointId = endpointId;
+ _magic = magic;
+ }
+
+ public String getNodeId() {
+ return _nodeId;
+ }
+
+ public TransportAddress setNodeId(String nodeId) {
+ _nodeId = nodeId;
+ return this;
+ }
+
+ public String getEndpointId() {
+ return _endpointId;
+ }
+
+ public TransportAddress setEndpointId(String endpointId) {
+ _endpointId = endpointId;
+ return this;
+ }
+
+ public static TransportAddress fromAddressString(String addressString) {
+ if(addressString == null || addressString.isEmpty())
+ return null;
+
+ String tokens[] = addressString.split("\\.");
+ if(tokens.length != 3)
+ return null;
+
+ return new TransportAddress(tokens[0], tokens[1], Integer.parseInt(tokens[2]));
+ }
+
+ public static TransportAddress getLocalPredefinedTransportAddress(String predefinedIdentifier) {
+ return new TransportAddress(LOCAL_SERVICE_NODE, predefinedIdentifier, 0);
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = _magic;
+ hashCode = (hashCode << 3) ^ _nodeId.hashCode();
+ hashCode = (hashCode << 3) ^ _endpointId.hashCode();
+
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if(other == null)
+ return false;
+
+ if(!(other instanceof TransportAddress))
+ return false;
+
+ if(this == other)
+ return true;
+
+ return _nodeId.equals(((TransportAddress)other)._nodeId) &&
+ _endpointId.equals(((TransportAddress)other)._endpointId) &&
+ _magic == ((TransportAddress)other)._magic;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ if(_nodeId != null)
+ sb.append(_nodeId);
+ sb.append(".");
+ sb.append(_endpointId);
+ sb.append(".");
+ sb.append(_magic);
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddressMapper.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddressMapper.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddressMapper.java
new file mode 100644
index 0000000..6edb788
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportAddressMapper.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.transport;
+
+public interface TransportAddressMapper {
+ String getAddress();
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportDataPdu.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportDataPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportDataPdu.java
new file mode 100644
index 0000000..ac9e06d
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportDataPdu.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.transport;
+
+import org.apache.cloudstack.framework.serializer.OnwireName;
+
+@OnwireName(name="TransportDataPdu")
+public class TransportDataPdu extends TransportPdu {
+
+ private String _multiplexier;
+ private String _content;
+
+ public TransportDataPdu() {
+ }
+
+ public String getMultiplexier() {
+ return _multiplexier;
+ }
+
+ public void setMultiplexier(String multiplexier) {
+ _multiplexier = multiplexier;
+ }
+
+ public String getContent() {
+ return _content;
+ }
+
+ public void setContent(String content) {
+ _content = content;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpoint.java
new file mode 100644
index 0000000..7767c35
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/transport/TransportEndpoint.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.transport;
+
+public interface TransportEndpoint extends TransportMultiplexier {
+ void onAttachConfirm(boolean bSuccess, String endpointAddress);
+ void onDetachIndication(String endpointAddress);
+}