You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2012/12/14 04:32:01 UTC
[1/4] git commit: Refactor and finalize framework IPC java package
structure
Updated Branches:
refs/heads/javelin e998ee59f -> 01a4a51ab
Refactor and finalize framework IPC java package structure
Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/01a4a51a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/01a4a51a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/01a4a51a
Branch: refs/heads/javelin
Commit: 01a4a51abf65904e0bde1cc9928e7011c7f19096
Parents: e998ee5
Author: Kelven Yang <ke...@gmail.com>
Authored: Thu Dec 13 18:44:57 2012 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Thu Dec 13 18:44:57 2012 -0800
----------------------------------------------------------------------
.../framework/async/AsyncCallbackDispatcher.java | 140 +++++++
.../framework/async/AsyncCallbackDriver.java | 24 ++
.../framework/async/AsyncCallbackHandler.java | 30 ++
.../framework/async/AsyncCompletionCallback.java | 23 ++
.../async/InplaceAsyncCallbackDriver.java | 28 ++
.../apache/cloudstack/framework/async/Void.java | 27 ++
.../framework/client/ClientEventBus.java | 31 ++
.../framework/client/ClientTransportEndpoint.java | 40 ++
.../framework/client/ClientTransportProvider.java | 64 +++
.../cloudstack/framework/eventbus/EventBus.java | 32 ++
.../framework/eventbus/EventBusBase.java | 308 +++++++++++++++
.../framework/eventbus/EventBusEndpoint.java | 61 +++
.../framework/eventbus/EventDispatcher.java | 104 +++++
.../framework/eventbus/EventHandler.java | 30 ++
.../framework/eventbus/PublishScope.java | 24 ++
.../cloudstack/framework/eventbus/Subscriber.java | 24 ++
.../messaging/AsyncCallbackDispatcher.java | 139 -------
.../framework/messaging/AsyncCallbackDriver.java | 23 --
.../framework/messaging/AsyncCallbackHandler.java | 30 --
.../messaging/AsyncCompletionCallback.java | 23 --
.../cloudstack/framework/messaging/EventBus.java | 30 --
.../framework/messaging/EventBusBase.java | 306 --------------
.../framework/messaging/EventBusEndpoint.java | 60 ---
.../framework/messaging/EventDispatcher.java | 103 -----
.../framework/messaging/EventHandler.java | 30 --
.../messaging/InplaceAsyncCallbackDriver.java | 27 --
.../framework/messaging/JsonMessageSerializer.java | 86 ----
.../framework/messaging/MessageSerializer.java | 24 --
.../framework/messaging/OnwireClassRegistry.java | 179 ---------
.../cloudstack/framework/messaging/OnwireName.java | 31 --
.../framework/messaging/PublishScope.java | 24 --
.../framework/messaging/RpcCallRequestPdu.java | 66 ---
.../framework/messaging/RpcCallResponsePdu.java | 78 ----
.../framework/messaging/RpcCallbackDispatcher.java | 87 ----
.../framework/messaging/RpcCallbackHandler.java | 30 --
.../framework/messaging/RpcCallbackListener.java | 24 --
.../framework/messaging/RpcClientCall.java | 46 ---
.../framework/messaging/RpcClientCallImpl.java | 234 -----------
.../framework/messaging/RpcException.java | 35 --
.../framework/messaging/RpcIOException.java | 36 --
.../framework/messaging/RpcProvider.java | 42 --
.../framework/messaging/RpcProviderImpl.java | 243 ------------
.../framework/messaging/RpcServerCall.java | 27 --
.../framework/messaging/RpcServerCallImpl.java | 71 ----
.../framework/messaging/RpcServiceDispatcher.java | 117 ------
.../framework/messaging/RpcServiceEndpoint.java | 30 --
.../framework/messaging/RpcServiceHandler.java | 30 --
.../framework/messaging/RpcTimeoutException.java | 32 --
.../cloudstack/framework/messaging/Subscriber.java | 24 --
.../framework/messaging/TransportAddress.java | 123 ------
.../messaging/TransportAddressMapper.java | 23 --
.../framework/messaging/TransportDataPdu.java | 45 ---
.../framework/messaging/TransportEndpoint.java | 24 --
.../framework/messaging/TransportEndpointSite.java | 134 -------
.../framework/messaging/TransportMultiplexier.java | 24 --
.../framework/messaging/TransportPdu.java | 40 --
.../framework/messaging/TransportProvider.java | 32 --
.../cloudstack/framework/messaging/Void.java | 27 --
.../framework/messaging/client/ClientEventBus.java | 31 --
.../messaging/client/ClientTransportEndpoint.java | 40 --
.../messaging/client/ClientTransportProvider.java | 64 ---
.../framework/messaging/server/ServerEventBus.java | 31 --
.../messaging/server/ServerTransportProvider.java | 190 ---------
.../framework/rpc/RpcCallRequestPdu.java | 68 ++++
.../framework/rpc/RpcCallResponsePdu.java | 80 ++++
.../framework/rpc/RpcCallbackDispatcher.java | 88 ++++
.../framework/rpc/RpcCallbackHandler.java | 30 ++
.../framework/rpc/RpcCallbackListener.java | 25 ++
.../cloudstack/framework/rpc/RpcClientCall.java | 47 +++
.../framework/rpc/RpcClientCallImpl.java | 235 +++++++++++
.../cloudstack/framework/rpc/RpcException.java | 35 ++
.../cloudstack/framework/rpc/RpcIOException.java | 37 ++
.../cloudstack/framework/rpc/RpcProvider.java | 46 +++
.../cloudstack/framework/rpc/RpcProviderImpl.java | 250 ++++++++++++
.../cloudstack/framework/rpc/RpcServerCall.java | 27 ++
.../framework/rpc/RpcServerCallImpl.java | 72 ++++
.../framework/rpc/RpcServiceDispatcher.java | 118 ++++++
.../framework/rpc/RpcServiceEndpoint.java | 31 ++
.../framework/rpc/RpcServiceHandler.java | 30 ++
.../framework/rpc/RpcTimeoutException.java | 33 ++
.../serializer/JsonMessageSerializer.java | 87 ++++
.../framework/serializer/MessageSerializer.java | 24 ++
.../framework/serializer/OnwireClassRegistry.java | 180 +++++++++
.../framework/serializer/OnwireName.java | 31 ++
.../framework/server/ServerEventBus.java | 31 ++
.../framework/server/ServerTransportProvider.java | 190 +++++++++
.../framework/transport/TransportAddress.java | 123 ++++++
.../transport/TransportAddressMapper.java | 23 ++
.../framework/transport/TransportDataPdu.java | 47 +++
.../framework/transport/TransportEndpoint.java | 24 ++
.../framework/transport/TransportEndpointSite.java | 134 +++++++
.../framework/transport/TransportMultiplexier.java | 24 ++
.../framework/transport/TransportPdu.java | 40 ++
.../framework/transport/TransportProvider.java | 34 ++
.../framework/codestyle/AsyncSampleCallee.java | 36 ++
.../AsyncSampleEventDrivenStyleCaller.java | 49 +++
.../codestyle/AsyncSampleListenerStyleCaller.java | 38 ++
.../codestyle/ClientOnlyEventDrivenStyle.java | 53 +++
.../codestyle/ClientOnlyListenerStyle.java | 59 +++
.../framework/messaging/AsyncSampleCallee.java | 34 --
.../AsyncSampleEventDrivenStyleCaller.java | 45 ---
.../messaging/AsyncSampleListenerStyleCaller.java | 36 --
.../messaging/ClientOnlyEventDrivenStyle.java | 46 ---
.../messaging/ClientOnlyListenerStyle.java | 52 ---
.../framework/messaging/SampleComponent.java | 47 ---
.../framework/messaging/TestCommand.java | 25 --
.../framework/messaging/TestCommandAnswer.java | 23 --
.../cloudstack/framework/messaging/TestVolume.java | 5 -
.../messaging/server/SampleManagementServer.java | 34 --
.../server/SampleManagementServerApp.java | 56 ---
.../messaging/server/SampleManagerComponent.java | 98 -----
.../messaging/server/SampleManagerComponent2.java | 72 ----
.../server/SampleStoragePrepareAnswer.java | 37 --
.../server/SampleStoragePrepareCommand.java | 47 ---
.../sampleserver/SampleManagementServer.java | 34 ++
.../sampleserver/SampleManagementServerApp.java | 56 +++
.../sampleserver/SampleManagerComponent.java | 98 +++++
.../sampleserver/SampleManagerComponent2.java | 72 ++++
.../sampleserver/SampleStoragePrepareAnswer.java | 37 ++
.../sampleserver/SampleStoragePrepareCommand.java | 47 +++
.../resources/SampleManagementServerAppContext.xml | 12 +-
121 files changed, 3819 insertions(+), 3858 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java
new file mode 100644
index 0000000..72bf9d2
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDispatcher.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.async;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+@SuppressWarnings("rawtypes")
+public class AsyncCallbackDispatcher implements AsyncCompletionCallback {
+ private static Map<Class<?>, Map<String, Method>> s_handlerCache = new HashMap<Class<?>, Map<String, Method>>();
+
+ private Map<String, Object> _contextMap = new HashMap<String, Object>();
+ private String _operationName;
+ private Object _targetObject;
+ private Object _resultObject;
+ private AsyncCallbackDriver _driver = new InplaceAsyncCallbackDriver();
+
+ public AsyncCallbackDispatcher(Object target) {
+ assert(target != null);
+ _targetObject = target;
+ }
+
+ public AsyncCallbackDispatcher setContextParam(String key, Object param) {
+ _contextMap.put(key, param);
+ return this;
+ }
+
+ public AsyncCallbackDispatcher attachDriver(AsyncCallbackDriver driver) {
+ assert(driver != null);
+ _driver = driver;
+
+ return this;
+ }
+
+ public AsyncCallbackDispatcher setOperationName(String name) {
+ _operationName = name;
+ return this;
+ }
+
+ public String getOperationName() {
+ return _operationName;
+ }
+
+ public Object getTargetObject() {
+ return _targetObject;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getContextParam(String key) {
+ return (T)_contextMap.get(key);
+ }
+
+ public void complete(Object resultObject) {
+ _resultObject = resultObject;
+ _driver.performCompletionCallback(this);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getResult() {
+ return (T)_resultObject;
+ }
+
+ public static boolean dispatch(Object target, AsyncCallbackDispatcher callback) {
+ assert(callback != null);
+ assert(target != null);
+
+ Method handler = resolveHandler(target.getClass(), callback.getOperationName());
+ if(handler == null)
+ return false;
+
+ try {
+ handler.invoke(target, callback);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException("IllegalArgumentException when invoking RPC callback for command: " + callback.getOperationName());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("IllegalAccessException when invoking RPC callback for command: " + callback.getOperationName());
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("InvocationTargetException when invoking RPC callback for command: " + callback.getOperationName());
+ }
+
+ return true;
+ }
+
+ public static Method resolveHandler(Class<?> handlerClz, String command) {
+ synchronized(s_handlerCache) {
+ Map<String, Method> handlerMap = getAndSetHandlerMap(handlerClz);
+
+ Method handler = handlerMap.get(command);
+ if(handler != null)
+ return handler;
+
+ for(Method method : handlerClz.getDeclaredMethods()) {
+ AsyncCallbackHandler annotation = method.getAnnotation(AsyncCallbackHandler.class);
+ if(annotation != null) {
+ if(annotation.operationName().equals(command)) {
+ handlerMap.put(command, method);
+ method.setAccessible(true);
+ return method;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private static Map<String, Method> getAndSetHandlerMap(Class<?> handlerClz) {
+ Map<String, Method> handlerMap;
+ synchronized(s_handlerCache) {
+ handlerMap = s_handlerCache.get(handlerClz);
+
+ if(handlerMap == null) {
+ handlerMap = new HashMap<String, Method>();
+ s_handlerCache.put(handlerClz, handlerMap);
+ }
+ }
+
+ return handlerMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDriver.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDriver.java b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDriver.java
new file mode 100644
index 0000000..d14f1a7
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackDriver.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.async;
+
+
+public interface AsyncCallbackDriver {
+ public void performCompletionCallback(AsyncCallbackDispatcher dispatcher);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackHandler.java
new file mode 100644
index 0000000..40b3bdd
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCallbackHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.async;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AsyncCallbackHandler {
+ String operationName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCompletionCallback.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCompletionCallback.java b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCompletionCallback.java
new file mode 100644
index 0000000..7cdf5fe
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/async/AsyncCompletionCallback.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.async;
+
+public interface AsyncCompletionCallback <T> {
+ void complete(T resultObject);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/async/InplaceAsyncCallbackDriver.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/async/InplaceAsyncCallbackDriver.java b/framework/ipc/src/org/apache/cloudstack/framework/async/InplaceAsyncCallbackDriver.java
new file mode 100644
index 0000000..ece9121
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/async/InplaceAsyncCallbackDriver.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.async;
+
+
+public class InplaceAsyncCallbackDriver implements AsyncCallbackDriver {
+
+ @Override
+ public void performCompletionCallback(AsyncCallbackDispatcher callback) {
+ AsyncCallbackDispatcher.dispatch(callback.getTargetObject(), callback);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/async/Void.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/async/Void.java b/framework/ipc/src/org/apache/cloudstack/framework/async/Void.java
new file mode 100644
index 0000000..b4c6d4a
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/async/Void.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.async;
+
+/**
+ * This is place-holder class to help AsyncMethod to indicate void return value
+ * public void AsyncMethod(Object realParam, AsyncCompletionCallback<Void> callback) {
+ *
+ */
+public class Void {
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
new file mode 100644
index 0000000..7930bf2
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientEventBus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.client;
+
+import org.apache.cloudstack.framework.eventbus.EventBusBase;
+import org.apache.cloudstack.framework.transport.TransportMultiplexier;
+
+public class ClientEventBus extends EventBusBase implements TransportMultiplexier {
+
+ @Override
+ public void onTransportMessage(String senderEndpointAddress,
+ String targetEndpointAddress, String multiplexer, String message) {
+ // TODO Auto-generated method stub
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportEndpoint.java
new file mode 100644
index 0000000..37fe5af
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportEndpoint.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.client;
+
+import org.apache.cloudstack.framework.transport.TransportEndpoint;
+
+public class ClientTransportEndpoint implements TransportEndpoint {
+
+ @Override
+ public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void onDetachIndication(String endpointAddress) {
+ }
+
+ @Override
+ public void onTransportMessage(String senderEndpointAddress,
+ String targetEndpointAddress, String multiplexer, String message) {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportProvider.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportProvider.java
new file mode 100644
index 0000000..3d76e3b
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/client/ClientTransportProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.client;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+import org.apache.cloudstack.framework.transport.TransportEndpoint;
+import org.apache.cloudstack.framework.transport.TransportEndpointSite;
+import org.apache.cloudstack.framework.transport.TransportProvider;
+
+public class ClientTransportProvider implements TransportProvider {
+
+ private MessageSerializer _messageSerializer;
+
+ @Override
+ public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean detach(TransportEndpoint endpoint) {
+ // TODO Auto-generated method stub
+
+ return false;
+ }
+
+ @Override
+ public void setMessageSerializer(MessageSerializer messageSerializer) {
+ assert(messageSerializer != null);
+ _messageSerializer = messageSerializer;
+ }
+
+ @Override
+ public MessageSerializer getMessageSerializer() {
+ return _messageSerializer;
+ }
+
+ @Override
+ public void requestSiteOutput(TransportEndpointSite site) {
+ // ???
+ }
+
+ @Override
+ public void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
+ String multiplexier, String message) {
+ // TODO
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java
new file mode 100644
index 0000000..200715c
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBus.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.eventbus;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+
+public interface EventBus {
+ void setMessageSerializer(MessageSerializer messageSerializer);
+ MessageSerializer getMessageSerializer();
+
+ void subscribe(String subject, Subscriber subscriber);
+ void unsubscribe(String subject, Subscriber subscriber);
+
+ void publish(String senderAddress, String subject, PublishScope scope, Object args);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java
new file mode 100644
index 0000000..30a847f
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusBase.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.eventbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cloudstack.framework.serializer.MessageSerializer;
+
+public class EventBusBase implements EventBus {
+
+ private Gate _gate;
+ private List<ActionRecord> _pendingActions;
+
+ private SubscriptionNode _subscriberRoot;
+ private MessageSerializer _messageSerializer;
+
+ public EventBusBase() {
+ _gate = new Gate();
+ _pendingActions = new ArrayList<ActionRecord>();
+
+ _subscriberRoot = new SubscriptionNode("/", null);
+ }
+
+ @Override
+ public void setMessageSerializer(MessageSerializer messageSerializer) {
+ _messageSerializer = messageSerializer;
+ }
+
+ @Override
+ public MessageSerializer getMessageSerializer() {
+ return _messageSerializer;
+ }
+
+ @Override
+ public void subscribe(String subject, Subscriber subscriber) {
+ assert(subject != null);
+ assert(subscriber != null);
+ if(_gate.enter()) {
+ SubscriptionNode current = locate(subject, null, true);
+ assert(current != null);
+ current.addSubscriber(subscriber);
+ _gate.leave();
+ } else {
+ synchronized(_pendingActions) {
+ _pendingActions.add(new ActionRecord(ActionType.Subscribe, subject, subscriber));
+ }
+ }
+ }
+
+ @Override
+ public void unsubscribe(String subject, Subscriber subscriber) {
+ if(_gate.enter()) {
+ SubscriptionNode current = locate(subject, null, false);
+ if(current != null)
+ current.removeSubscriber(subscriber);
+
+ _gate.leave();
+ } else {
+ synchronized(_pendingActions) {
+ _pendingActions.add(new ActionRecord(ActionType.Unsubscribe, subject, subscriber));
+ }
+ }
+ }
+
+ @Override
+ public void publish(String senderAddress, String subject, PublishScope scope,
+ Object args) {
+
+ if(_gate.enter(true)) {
+
+ List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>();
+ SubscriptionNode current = locate(subject, chainFromTop, false);
+
+ if(current != null)
+ current.notifySubscribers(senderAddress, subject, args);
+
+ Collections.reverse(chainFromTop);
+ for(SubscriptionNode node : chainFromTop)
+ node.notifySubscribers(senderAddress, subject, args);
+
+ _gate.leave();
+ }
+ }
+
+ private void onGateOpen() {
+ synchronized(_pendingActions) {
+ ActionRecord record = null;
+ if(_pendingActions.size() > 0) {
+ while((record = _pendingActions.remove(0)) != null) {
+ switch(record.getType()) {
+ case Subscribe :
+ {
+ SubscriptionNode current = locate(record.getSubject(), null, true);
+ assert(current != null);
+ current.addSubscriber(record.getSubscriber());
+ }
+ break;
+
+ case Unsubscribe :
+ {
+ SubscriptionNode current = locate(record.getSubject(), null, false);
+ if(current != null)
+ current.removeSubscriber(record.getSubscriber());
+ }
+ break;
+
+ default :
+ assert(false);
+ break;
+
+ }
+ }
+ }
+ }
+ }
+
+
+ private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop,
+ boolean createPath) {
+
+ assert(subject != null);
+
+ String[] subjectPathTokens = subject.split("\\.");
+ return locate(subjectPathTokens, _subscriberRoot, chainFromTop, createPath);
+ }
+
+ private static SubscriptionNode locate(String[] subjectPathTokens,
+ SubscriptionNode current, List<SubscriptionNode> chainFromTop, boolean createPath) {
+
+ assert(current != null);
+ assert(subjectPathTokens != null);
+ assert(subjectPathTokens.length > 0);
+
+ if(chainFromTop != null)
+ chainFromTop.add(current);
+
+ SubscriptionNode next = current.getChild(subjectPathTokens[0]);
+ if(next == null) {
+ if(createPath) {
+ next = new SubscriptionNode(subjectPathTokens[0], null);
+ current.addChild(subjectPathTokens[0], next);
+ } else {
+ return null;
+ }
+ }
+
+ if(subjectPathTokens.length > 1) {
+ return locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, subjectPathTokens.length),
+ next, chainFromTop, createPath);
+ } else {
+ return next;
+ }
+ }
+
+
+ //
+ // Support inner classes
+ //
+ private static enum ActionType {
+ Subscribe,
+ Unsubscribe
+ }
+
+ private static class ActionRecord {
+ private ActionType _type;
+ private String _subject;
+ private Subscriber _subscriber;
+
+ public ActionRecord(ActionType type, String subject, Subscriber subscriber) {
+ _type = type;
+ _subject = subject;
+ _subscriber = subscriber;
+ }
+
+ public ActionType getType() {
+ return _type;
+ }
+
+ public String getSubject() {
+ return _subject;
+ }
+
+ public Subscriber getSubscriber() {
+ return _subscriber;
+ }
+ }
+
+ private class Gate {
+ private int _reentranceCount;
+ private Thread _gateOwner;
+
+ public Gate() {
+ _reentranceCount = 0;
+ _gateOwner = null;
+ }
+
+ public boolean enter() {
+ return enter(false);
+ }
+
+ public boolean enter(boolean wait) {
+ while(true) {
+ synchronized(this) {
+ if(_reentranceCount == 0) {
+ assert(_gateOwner == null);
+
+ _reentranceCount++;
+ _gateOwner = Thread.currentThread();
+ return true;
+ } else {
+ if(wait) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public void leave() {
+ synchronized(this) {
+ if(_reentranceCount > 0) {
+ assert(_gateOwner == Thread.currentThread());
+
+ onGateOpen();
+ _reentranceCount--;
+ assert(_reentranceCount == 0);
+ _gateOwner = null;
+
+ notifyAll();
+ }
+ }
+ }
+ }
+
+ private static class SubscriptionNode {
+ @SuppressWarnings("unused")
+ private String _nodeKey;
+ private List<Subscriber> _subscribers;
+ private Map<String, SubscriptionNode> _children;
+
+ public SubscriptionNode(String nodeKey, Subscriber subscriber) {
+ assert(nodeKey != null);
+ _nodeKey = nodeKey;
+ _subscribers = new ArrayList<Subscriber>();
+
+ if(subscriber != null)
+ _subscribers.add(subscriber);
+
+ _children = new HashMap<String, SubscriptionNode>();
+ }
+
+ @SuppressWarnings("unused")
+ public List<Subscriber> getSubscriber() {
+ return _subscribers;
+ }
+
+ public void addSubscriber(Subscriber subscriber) {
+ _subscribers.add(subscriber);
+ }
+
+ public void removeSubscriber(Subscriber subscriber) {
+ _subscribers.remove(subscriber);
+ }
+
+ public SubscriptionNode getChild(String key) {
+ return _children.get(key);
+ }
+
+ public void addChild(String key, SubscriptionNode childNode) {
+ _children.put(key, childNode);
+ }
+
+ public void notifySubscribers(String senderAddress, String subject, Object args) {
+ for(Subscriber subscriber : _subscribers) {
+ subscriber.onPublishEvent(senderAddress, subject, args);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java
new file mode 100644
index 0000000..19a9b03
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventBusEndpoint.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.eventbus;
+
+
+public class EventBusEndpoint {
+ private EventBus _eventBus;
+ private String _sender;
+ private PublishScope _scope;
+
+ public EventBusEndpoint(EventBus eventBus, String sender, PublishScope scope) {
+ _eventBus = eventBus;
+ _sender = sender;
+ _scope = scope;
+ }
+
+ public EventBusEndpoint setEventBus(EventBus eventBus) {
+ _eventBus = eventBus;
+ return this;
+ }
+
+ public EventBusEndpoint setScope(PublishScope scope) {
+ _scope = scope;
+ return this;
+ }
+
+ public PublishScope getScope() {
+ return _scope;
+ }
+
+ public EventBusEndpoint setSender(String sender) {
+ _sender = sender;
+ return this;
+ }
+
+ public String getSender() {
+ return _sender;
+ }
+
+ public void Publish(String subject, Object args) {
+ assert(_eventBus != null);
+ _eventBus.publish(_sender, subject, _scope, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java
new file mode 100644
index 0000000..336a994
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventDispatcher.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.eventbus;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class EventDispatcher implements Subscriber {
+ private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
+
+ private static Map<Object, EventDispatcher> s_targetMap = new HashMap<Object, EventDispatcher>();
+ private Object _targetObject;
+
+ public EventDispatcher(Object targetObject) {
+ _targetObject = targetObject;
+ }
+
+ @Override
+ public void onPublishEvent(String senderAddress, String subject, Object args) {
+ dispatch(_targetObject, subject, senderAddress, args);
+ }
+
+ public static EventDispatcher getDispatcher(Object targetObject) {
+ EventDispatcher dispatcher;
+ synchronized(s_targetMap) {
+ dispatcher = s_targetMap.get(targetObject);
+ if(dispatcher == null) {
+ dispatcher = new EventDispatcher(targetObject);
+ s_targetMap.put(targetObject, dispatcher);
+ }
+ }
+ return dispatcher;
+ }
+
+ public static void removeDispatcher(Object targetObject) {
+ synchronized(s_targetMap) {
+ s_targetMap.remove(targetObject);
+ }
+ }
+
+ public static boolean dispatch(Object target, String subject, String senderAddress, Object args) {
+ assert(subject != null);
+ assert(target != null);
+
+ Method handler = resolveHandler(target.getClass(), subject);
+ if(handler == null)
+ return false;
+
+ try {
+ handler.invoke(target, subject, senderAddress, args);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject);
+ }
+
+ return true;
+ }
+
+ public static Method resolveHandler(Class<?> handlerClz, String subject) {
+ synchronized(s_handlerCache) {
+ Method handler = s_handlerCache.get(handlerClz);
+ if(handler != null)
+ return handler;
+
+ for(Method method : handlerClz.getMethods()) {
+ EventHandler annotation = method.getAnnotation(EventHandler.class);
+ if(annotation != null) {
+ if(match(annotation.topic(), subject)) {
+ s_handlerCache.put(handlerClz, method);
+ return method;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private static boolean match(String expression, String param) {
+ return param.matches(expression);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java
new file mode 100644
index 0000000..1ed3a00
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/EventHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cloudstack.framework.eventbus;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface EventHandler {
+ public String topic();
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java
new file mode 100644
index 0000000..539a242
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/PublishScope.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.eventbus;
+
+public enum PublishScope {
+ LOCAL, GLOBAL
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java
new file mode 100644
index 0000000..28b86de
--- /dev/null
+++ b/framework/ipc/src/org/apache/cloudstack/framework/eventbus/Subscriber.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.eventbus;
+
+public interface Subscriber {
+ void onPublishEvent(String senderAddress, String subject, Object args);
+}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDispatcher.java
deleted file mode 100644
index 089a5d8..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDispatcher.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.messaging;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
-@SuppressWarnings("rawtypes")
-public class AsyncCallbackDispatcher implements AsyncCompletionCallback {
- private static Map<Class<?>, Map<String, Method>> s_handlerCache = new HashMap<Class<?>, Map<String, Method>>();
-
- private Map<String, Object> _contextMap = new HashMap<String, Object>();
- private String _operationName;
- private Object _targetObject;
- private Object _resultObject;
- private AsyncCallbackDriver _driver = new InplaceAsyncCallbackDriver();
-
- public AsyncCallbackDispatcher(Object target) {
- assert(target != null);
- _targetObject = target;
- }
-
- public AsyncCallbackDispatcher setContextParam(String key, Object param) {
- _contextMap.put(key, param);
- return this;
- }
-
- public AsyncCallbackDispatcher attachDriver(AsyncCallbackDriver driver) {
- assert(driver != null);
- _driver = driver;
-
- return this;
- }
-
- public AsyncCallbackDispatcher setOperationName(String name) {
- _operationName = name;
- return this;
- }
-
- public String getOperationName() {
- return _operationName;
- }
-
- public Object getTargetObject() {
- return _targetObject;
- }
-
- @SuppressWarnings("unchecked")
- public <T> T getContextParam(String key) {
- return (T)_contextMap.get(key);
- }
-
- public void complete(Object resultObject) {
- _resultObject = resultObject;
- _driver.performCompletionCallback(this);
- }
-
- @SuppressWarnings("unchecked")
- public <T> T getResult() {
- return (T)_resultObject;
- }
-
- public static boolean dispatch(Object target, AsyncCallbackDispatcher callback) {
- assert(callback != null);
- assert(target != null);
-
- Method handler = resolveHandler(target.getClass(), callback.getOperationName());
- if(handler == null)
- return false;
-
- try {
- handler.invoke(target, callback);
- } catch (IllegalArgumentException e) {
- throw new RuntimeException("IllegalArgumentException when invoking RPC callback for command: " + callback.getOperationName());
- } catch (IllegalAccessException e) {
- throw new RuntimeException("IllegalAccessException when invoking RPC callback for command: " + callback.getOperationName());
- } catch (InvocationTargetException e) {
- throw new RuntimeException("InvocationTargetException when invoking RPC callback for command: " + callback.getOperationName());
- }
-
- return true;
- }
-
- public static Method resolveHandler(Class<?> handlerClz, String command) {
- synchronized(s_handlerCache) {
- Map<String, Method> handlerMap = getAndSetHandlerMap(handlerClz);
-
- Method handler = handlerMap.get(command);
- if(handler != null)
- return handler;
-
- for(Method method : handlerClz.getDeclaredMethods()) {
- AsyncCallbackHandler annotation = method.getAnnotation(AsyncCallbackHandler.class);
- if(annotation != null) {
- if(annotation.operationName().equals(command)) {
- handlerMap.put(command, method);
- method.setAccessible(true);
- return method;
- }
- }
- }
- }
-
- return null;
- }
-
- private static Map<String, Method> getAndSetHandlerMap(Class<?> handlerClz) {
- Map<String, Method> handlerMap;
- synchronized(s_handlerCache) {
- handlerMap = s_handlerCache.get(handlerClz);
-
- if(handlerMap == null) {
- handlerMap = new HashMap<String, Method>();
- s_handlerCache.put(handlerClz, handlerMap);
- }
- }
-
- return handlerMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java
deleted file mode 100644
index d00aad6..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackDriver.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-public interface AsyncCallbackDriver {
- public void performCompletionCallback(AsyncCallbackDispatcher dispatcher);
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackHandler.java
deleted file mode 100644
index 0df02d8..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCallbackHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface AsyncCallbackHandler {
- String operationName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCompletionCallback.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCompletionCallback.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCompletionCallback.java
deleted file mode 100644
index 9099594..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/AsyncCompletionCallback.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-public interface AsyncCompletionCallback <T> {
- void complete(T resultObject);
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java
deleted file mode 100644
index f46ee25..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.messaging;
-
-public interface EventBus {
- void setMessageSerializer(MessageSerializer messageSerializer);
- MessageSerializer getMessageSerializer();
-
- void subscribe(String subject, Subscriber subscriber);
- void unsubscribe(String subject, Subscriber subscriber);
-
- void publish(String senderAddress, String subject, PublishScope scope, Object args);
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java
deleted file mode 100644
index 6586890..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.messaging;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class EventBusBase implements EventBus {
-
- private Gate _gate;
- private List<ActionRecord> _pendingActions;
-
- private SubscriptionNode _subscriberRoot;
- private MessageSerializer _messageSerializer;
-
- public EventBusBase() {
- _gate = new Gate();
- _pendingActions = new ArrayList<ActionRecord>();
-
- _subscriberRoot = new SubscriptionNode("/", null);
- }
-
- @Override
- public void setMessageSerializer(MessageSerializer messageSerializer) {
- _messageSerializer = messageSerializer;
- }
-
- @Override
- public MessageSerializer getMessageSerializer() {
- return _messageSerializer;
- }
-
- @Override
- public void subscribe(String subject, Subscriber subscriber) {
- assert(subject != null);
- assert(subscriber != null);
- if(_gate.enter()) {
- SubscriptionNode current = locate(subject, null, true);
- assert(current != null);
- current.addSubscriber(subscriber);
- _gate.leave();
- } else {
- synchronized(_pendingActions) {
- _pendingActions.add(new ActionRecord(ActionType.Subscribe, subject, subscriber));
- }
- }
- }
-
- @Override
- public void unsubscribe(String subject, Subscriber subscriber) {
- if(_gate.enter()) {
- SubscriptionNode current = locate(subject, null, false);
- if(current != null)
- current.removeSubscriber(subscriber);
-
- _gate.leave();
- } else {
- synchronized(_pendingActions) {
- _pendingActions.add(new ActionRecord(ActionType.Unsubscribe, subject, subscriber));
- }
- }
- }
-
- @Override
- public void publish(String senderAddress, String subject, PublishScope scope,
- Object args) {
-
- if(_gate.enter(true)) {
-
- List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>();
- SubscriptionNode current = locate(subject, chainFromTop, false);
-
- if(current != null)
- current.notifySubscribers(senderAddress, subject, args);
-
- Collections.reverse(chainFromTop);
- for(SubscriptionNode node : chainFromTop)
- node.notifySubscribers(senderAddress, subject, args);
-
- _gate.leave();
- }
- }
-
- private void onGateOpen() {
- synchronized(_pendingActions) {
- ActionRecord record = null;
- if(_pendingActions.size() > 0) {
- while((record = _pendingActions.remove(0)) != null) {
- switch(record.getType()) {
- case Subscribe :
- {
- SubscriptionNode current = locate(record.getSubject(), null, true);
- assert(current != null);
- current.addSubscriber(record.getSubscriber());
- }
- break;
-
- case Unsubscribe :
- {
- SubscriptionNode current = locate(record.getSubject(), null, false);
- if(current != null)
- current.removeSubscriber(record.getSubscriber());
- }
- break;
-
- default :
- assert(false);
- break;
-
- }
- }
- }
- }
- }
-
-
- private SubscriptionNode locate(String subject, List<SubscriptionNode> chainFromTop,
- boolean createPath) {
-
- assert(subject != null);
-
- String[] subjectPathTokens = subject.split("\\.");
- return locate(subjectPathTokens, _subscriberRoot, chainFromTop, createPath);
- }
-
- private static SubscriptionNode locate(String[] subjectPathTokens,
- SubscriptionNode current, List<SubscriptionNode> chainFromTop, boolean createPath) {
-
- assert(current != null);
- assert(subjectPathTokens != null);
- assert(subjectPathTokens.length > 0);
-
- if(chainFromTop != null)
- chainFromTop.add(current);
-
- SubscriptionNode next = current.getChild(subjectPathTokens[0]);
- if(next == null) {
- if(createPath) {
- next = new SubscriptionNode(subjectPathTokens[0], null);
- current.addChild(subjectPathTokens[0], next);
- } else {
- return null;
- }
- }
-
- if(subjectPathTokens.length > 1) {
- return locate((String[])Arrays.copyOfRange(subjectPathTokens, 1, subjectPathTokens.length),
- next, chainFromTop, createPath);
- } else {
- return next;
- }
- }
-
-
- //
- // Support inner classes
- //
- private static enum ActionType {
- Subscribe,
- Unsubscribe
- }
-
- private static class ActionRecord {
- private ActionType _type;
- private String _subject;
- private Subscriber _subscriber;
-
- public ActionRecord(ActionType type, String subject, Subscriber subscriber) {
- _type = type;
- _subject = subject;
- _subscriber = subscriber;
- }
-
- public ActionType getType() {
- return _type;
- }
-
- public String getSubject() {
- return _subject;
- }
-
- public Subscriber getSubscriber() {
- return _subscriber;
- }
- }
-
- private class Gate {
- private int _reentranceCount;
- private Thread _gateOwner;
-
- public Gate() {
- _reentranceCount = 0;
- _gateOwner = null;
- }
-
- public boolean enter() {
- return enter(false);
- }
-
- public boolean enter(boolean wait) {
- while(true) {
- synchronized(this) {
- if(_reentranceCount == 0) {
- assert(_gateOwner == null);
-
- _reentranceCount++;
- _gateOwner = Thread.currentThread();
- return true;
- } else {
- if(wait) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- } else {
- break;
- }
- }
- }
- }
-
- return false;
- }
-
- public void leave() {
- synchronized(this) {
- if(_reentranceCount > 0) {
- assert(_gateOwner == Thread.currentThread());
-
- onGateOpen();
- _reentranceCount--;
- assert(_reentranceCount == 0);
- _gateOwner = null;
-
- notifyAll();
- }
- }
- }
- }
-
- private static class SubscriptionNode {
- @SuppressWarnings("unused")
- private String _nodeKey;
- private List<Subscriber> _subscribers;
- private Map<String, SubscriptionNode> _children;
-
- public SubscriptionNode(String nodeKey, Subscriber subscriber) {
- assert(nodeKey != null);
- _nodeKey = nodeKey;
- _subscribers = new ArrayList<Subscriber>();
-
- if(subscriber != null)
- _subscribers.add(subscriber);
-
- _children = new HashMap<String, SubscriptionNode>();
- }
-
- @SuppressWarnings("unused")
- public List<Subscriber> getSubscriber() {
- return _subscribers;
- }
-
- public void addSubscriber(Subscriber subscriber) {
- _subscribers.add(subscriber);
- }
-
- public void removeSubscriber(Subscriber subscriber) {
- _subscribers.remove(subscriber);
- }
-
- public SubscriptionNode getChild(String key) {
- return _children.get(key);
- }
-
- public void addChild(String key, SubscriptionNode childNode) {
- _children.put(key, childNode);
- }
-
- public void notifySubscribers(String senderAddress, String subject, Object args) {
- for(Subscriber subscriber : _subscribers) {
- subscriber.onPublishEvent(senderAddress, subject, args);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusEndpoint.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusEndpoint.java
deleted file mode 100644
index b51fb6d..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusEndpoint.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cloudstack.framework.messaging;
-
-public class EventBusEndpoint {
- private EventBus _eventBus;
- private String _sender;
- private PublishScope _scope;
-
- public EventBusEndpoint(EventBus eventBus, String sender, PublishScope scope) {
- _eventBus = eventBus;
- _sender = sender;
- _scope = scope;
- }
-
- public EventBusEndpoint setEventBus(EventBus eventBus) {
- _eventBus = eventBus;
- return this;
- }
-
- public EventBusEndpoint setScope(PublishScope scope) {
- _scope = scope;
- return this;
- }
-
- public PublishScope getScope() {
- return _scope;
- }
-
- public EventBusEndpoint setSender(String sender) {
- _sender = sender;
- return this;
- }
-
- public String getSender() {
- return _sender;
- }
-
- public void Publish(String subject, Object args) {
- assert(_eventBus != null);
- _eventBus.publish(_sender, subject, _scope, args);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
deleted file mode 100644
index debc993..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
-public class EventDispatcher implements Subscriber {
- private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
-
- private static Map<Object, EventDispatcher> s_targetMap = new HashMap<Object, EventDispatcher>();
- private Object _targetObject;
-
- public EventDispatcher(Object targetObject) {
- _targetObject = targetObject;
- }
-
- @Override
- public void onPublishEvent(String senderAddress, String subject, Object args) {
- dispatch(_targetObject, subject, senderAddress, args);
- }
-
- public static EventDispatcher getDispatcher(Object targetObject) {
- EventDispatcher dispatcher;
- synchronized(s_targetMap) {
- dispatcher = s_targetMap.get(targetObject);
- if(dispatcher == null) {
- dispatcher = new EventDispatcher(targetObject);
- s_targetMap.put(targetObject, dispatcher);
- }
- }
- return dispatcher;
- }
-
- public static void removeDispatcher(Object targetObject) {
- synchronized(s_targetMap) {
- s_targetMap.remove(targetObject);
- }
- }
-
- public static boolean dispatch(Object target, String subject, String senderAddress, Object args) {
- assert(subject != null);
- assert(target != null);
-
- Method handler = resolveHandler(target.getClass(), subject);
- if(handler == null)
- return false;
-
- try {
- handler.invoke(target, subject, senderAddress, args);
- } catch (IllegalArgumentException e) {
- throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject);
- } catch (InvocationTargetException e) {
- throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject);
- }
-
- return true;
- }
-
- public static Method resolveHandler(Class<?> handlerClz, String subject) {
- synchronized(s_handlerCache) {
- Method handler = s_handlerCache.get(handlerClz);
- if(handler != null)
- return handler;
-
- for(Method method : handlerClz.getMethods()) {
- EventHandler annotation = method.getAnnotation(EventHandler.class);
- if(annotation != null) {
- if(match(annotation.topic(), subject)) {
- s_handlerCache.put(handlerClz, method);
- return method;
- }
- }
- }
- }
-
- return null;
- }
-
- private static boolean match(String expression, String param) {
- return param.matches(expression);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java
deleted file mode 100644
index 5ec03f1..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface EventHandler {
- public String topic();
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/InplaceAsyncCallbackDriver.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/InplaceAsyncCallbackDriver.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/InplaceAsyncCallbackDriver.java
deleted file mode 100644
index 424f7df..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/InplaceAsyncCallbackDriver.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-public class InplaceAsyncCallbackDriver implements AsyncCallbackDriver {
-
- @Override
- public void performCompletionCallback(AsyncCallbackDispatcher callback) {
- AsyncCallbackDispatcher.dispatch(callback.getTargetObject(), callback);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java
deleted file mode 100644
index 3fed857..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/JsonMessageSerializer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-public class JsonMessageSerializer implements MessageSerializer {
-
- // this will be injected from external to allow installation of
- // type adapters needed by upper layer applications
- private Gson _gson;
-
- private OnwireClassRegistry _clzRegistry;
-
- public JsonMessageSerializer() {
- GsonBuilder gsonBuilder = new GsonBuilder();
- gsonBuilder.setVersion(1.5);
- _gson = gsonBuilder.create();
- }
-
- public Gson getGson() {
- return _gson;
- }
-
- public void setGson(Gson gson) {
- _gson = gson;
- }
-
- public OnwireClassRegistry getOnwireClassRegistry() {
- return _clzRegistry;
- }
-
- public void setOnwireClassRegistry(OnwireClassRegistry clzRegistry) {
- _clzRegistry = clzRegistry;
- }
-
- @Override
- public <T> String serializeTo(Class<?> clz, T object) {
- assert(clz != null);
- assert(object != null);
-
- StringBuffer sbuf = new StringBuffer();
-
- OnwireName onwire = clz.getAnnotation(OnwireName.class);
- if(onwire == null)
- throw new RuntimeException("Class " + clz.getCanonicalName() + " is not declared to be onwire");
-
- sbuf.append(onwire.name()).append("|");
- sbuf.append(_gson.toJson(object));
-
- return sbuf.toString();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T> T serializeFrom(String message) {
- assert(message != null);
- int contentStartPos = message.indexOf('|');
- if(contentStartPos < 0)
- throw new RuntimeException("Invalid on-wire message format");
-
- String onwireName = message.substring(0, contentStartPos);
- Class<?> clz = _clzRegistry.getOnwireClass(onwireName);
- if(clz == null)
- throw new RuntimeException("Onwire class is not registered. name: " + onwireName);
-
- return (T)_gson.fromJson(message.substring(contentStartPos + 1), clz);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/01a4a51a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java
deleted file mode 100644
index d07a3ad..0000000
--- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cloudstack.framework.messaging;
-
-public interface MessageSerializer {
- <T>String serializeTo(Class<?> clz, T object);
- <T> T serializeFrom(String message);
-}