You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by da...@apache.org on 2006/08/21 07:55:39 UTC

svn commit: r433159 [2/2] - in /webservices/sandesha/trunk/c: include/ include/sandesha2/ src/handlers/ src/msgprocessors/ src/storage/beans/ src/storage/inmemory/ src/util/ src/workers/ src/wsrm/

Modified: webservices/sandesha/trunk/c/src/workers/in_order_invoker.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/in_order_invoker.c?rev=433159&r1=433158&r2=433159&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/in_order_invoker.c (original)
+++ webservices/sandesha/trunk/c/src/workers/in_order_invoker.c Sun Aug 20 22:55:38 2006
@@ -184,7 +184,6 @@
     AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE);
     
     invoker_impl = SANDESHA2_INTF_TO_IMPL(invoker);
-    axis2_thread_mutex_lock(invoker_impl->mutex);
     for(i = 0; i < AXIS2_ARRAY_LIST_SIZE(invoker_impl->working_seqs, env); i++)
     {
         axis2_char_t *tmp_id = NULL;
@@ -197,7 +196,6 @@
     }
     if(0 == AXIS2_ARRAY_LIST_SIZE(invoker_impl->working_seqs, env))
         invoker_impl->run_invoker = AXIS2_FALSE;
-    axis2_thread_mutex_unlock(invoker_impl->mutex);
     return AXIS2_SUCCESS;
 }
             
@@ -209,9 +207,7 @@
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
     
     invoker_impl = SANDESHA2_INTF_TO_IMPL(invoker);
-    axis2_thread_mutex_lock(invoker_impl->mutex);
     SANDESHA2_INTF_TO_IMPL(invoker)->run_invoker = AXIS2_FALSE;
-    axis2_thread_mutex_unlock(invoker_impl->mutex);
     return AXIS2_SUCCESS;
 }
             
@@ -226,9 +222,7 @@
     AXIS2_ENV_CHECK(env, AXIS2_FALSE);
     
     invoker_impl = SANDESHA2_INTF_TO_IMPL(invoker);
-    axis2_thread_mutex_lock(invoker_impl->mutex);
     started = SANDESHA2_INTF_TO_IMPL(invoker)->run_invoker;
-    axis2_thread_mutex_unlock(invoker_impl->mutex);
     return started;    
 }
             
@@ -244,7 +238,6 @@
     AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE);
     
     invoker_impl = SANDESHA2_INTF_TO_IMPL(invoker);
-    axis2_thread_mutex_lock(invoker_impl->mutex);
     if(AXIS2_FALSE == sandesha2_utils_array_list_contains(env, 
                         invoker_impl->working_seqs, seq_id))
         AXIS2_ARRAY_LIST_ADD(invoker_impl->working_seqs, env, seq_id);
@@ -254,7 +247,6 @@
         invoker_impl->run_invoker = AXIS2_TRUE;
         sandesha2_in_order_invoker_run(invoker, env);
     }
-    axis2_thread_mutex_unlock(invoker_impl->mutex);
     return AXIS2_SUCCESS;
 }
             

Modified: webservices/sandesha/trunk/c/src/workers/sender.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender.c?rev=433159&r1=433158&r2=433159&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/sender.c (original)
+++ webservices/sandesha/trunk/c/src/workers/sender.c Sun Aug 20 22:55:38 2006
@@ -104,7 +104,7 @@
                         axiom_soap_envelope_t *soap_envelope);
                         
 
-void * AXIS2_THREAD_FUNC
+static void * AXIS2_THREAD_FUNC
 sandesha2_sender_worker_func(axis2_thread_t *thd, void *data);
 
 axis2_status_t AXIS2_CALL 
@@ -204,7 +204,6 @@
     AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE);
     
     sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
-    axis2_thread_mutex_lock(sender_impl->mutex);
     for(i = 0; i < AXIS2_ARRAY_LIST_SIZE(sender_impl->working_seqs, env); i++)
     {
         axis2_char_t *tmp_id = NULL;
@@ -217,7 +216,6 @@
     }
     if(0 == AXIS2_ARRAY_LIST_SIZE(sender_impl->working_seqs, env))
         sender_impl->run_sender = AXIS2_FALSE;
-    axis2_thread_mutex_unlock(sender_impl->mutex);
     return AXIS2_SUCCESS;
 }
             
@@ -229,9 +227,7 @@
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
     
     sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
-    axis2_thread_mutex_lock(sender_impl->mutex);
     SANDESHA2_INTF_TO_IMPL(sender)->run_sender = AXIS2_FALSE;
-    axis2_thread_mutex_unlock(sender_impl->mutex);
     return AXIS2_SUCCESS;
 }
             
@@ -240,16 +236,12 @@
                         (sandesha2_sender_t *sender, 
                         const axis2_env_t *env)
 {
-    axis2_bool_t started = AXIS2_FALSE;
     sandesha2_sender_impl_t *sender_impl = NULL;
      
     AXIS2_ENV_CHECK(env, AXIS2_FALSE);
     
     sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
-    axis2_thread_mutex_lock(sender_impl->mutex);
-    started = SANDESHA2_INTF_TO_IMPL(sender)->run_sender;
-    axis2_thread_mutex_unlock(sender_impl->mutex);
-    return started;    
+    sender_impl->run_sender;
 }
             
 axis2_status_t AXIS2_CALL 
@@ -261,11 +253,11 @@
     sandesha2_sender_impl_t *sender_impl = NULL;
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
     AXIS2_PARAM_CHECK(env->error, conf_ctx, AXIS2_FAILURE);
-    AXIS2_PARAM_CHECK(env->error, seq_id, AXIS2_FAILURE);
     
     sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
+    
     axis2_thread_mutex_lock(sender_impl->mutex);
-    if(AXIS2_FALSE == sandesha2_utils_array_list_contains(env, 
+    if(seq_id && AXIS2_FALSE == sandesha2_utils_array_list_contains(env, 
                         sender_impl->working_seqs, seq_id))
         AXIS2_ARRAY_LIST_ADD(sender_impl->working_seqs, env, seq_id);
     if(AXIS2_FALSE == sender_impl->run_sender)
@@ -279,12 +271,14 @@
 }
             
 axis2_status_t AXIS2_CALL 
-sandesha2_sender_run (sandesha2_sender_t *sender,
-                        const axis2_env_t *env)
+sandesha2_sender_run (
+        sandesha2_sender_t *sender,
+        const axis2_env_t *env)
 {
     sandesha2_sender_impl_t *sender_impl = NULL;
     axis2_thread_t *worker_thread = NULL;
     sandesha2_sender_args_t *args = NULL;
+
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
     
     sender_impl = SANDESHA2_INTF_TO_IMPL(sender);
@@ -292,6 +286,11 @@
     args = AXIS2_MALLOC(env->allocator, sizeof(sandesha2_sender_args_t)); 
     args->impl = sender_impl;
     args->env = (axis2_env_t*)env;
+    /* Assigning like theis cause seg faults when trying to allocate memory
+     * inside worker function. So created new env */
+    /*args->env->allocator = env->allocator;*/
+    args->env = axis2_env_create(env->allocator);
+
     worker_thread = AXIS2_THREAD_POOL_GET_THREAD(env->thread_pool,
                         sandesha2_sender_worker_func, (void*)args);
     if(NULL == worker_thread)
@@ -319,7 +318,6 @@
     
     AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
     AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
-    
     property = AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env, AXIS2_TRANSPORT_IN,
                     AXIS2_FALSE);
     if(NULL == property)
@@ -437,8 +435,10 @@
 /**
  * Thread worker function.
  */
-void * AXIS2_THREAD_FUNC
-sandesha2_sender_worker_func(axis2_thread_t *thd, void *data)
+static void * AXIS2_THREAD_FUNC
+sandesha2_sender_worker_func(
+        axis2_thread_t *thd, 
+        void *data)
 {
     sandesha2_sender_impl_t *sender_impl = NULL;
     sandesha2_sender_t *sender = NULL;
@@ -462,6 +462,7 @@
         axis2_bool_t rollbacked = AXIS2_FALSE;*/
         sandesha2_sender_mgr_t *mgr = NULL;
         sandesha2_sender_bean_t *sender_bean = NULL;
+        sandesha2_sender_bean_t *bean1 = NULL;
         sandesha2_seq_property_mgr_t *seq_prop_mgr = NULL;
         axis2_char_t *key = NULL;
         axis2_msg_ctx_t *msg_ctx = NULL;
@@ -475,18 +476,20 @@
         axis2_transport_out_desc_t *transport_out = NULL;
         axis2_transport_sender_t *transport_sender = NULL;
         axis2_bool_t successfully_sent = AXIS2_FALSE;
-        sandesha2_sender_bean_t *bean1 = NULL;
-        
+        axis2_char_t *msg_id = NULL;
+   
         sleep(1);
         transaction = SANDESHA2_STORAGE_MGR_GET_TRANSACTION(storage_mgr,
                         env);
         mgr = SANDESHA2_STORAGE_MGR_GET_RETRANS_MGR(storage_mgr, env);
         seq_prop_mgr = SANDESHA2_STORAGE_MGR_GET_SEQ_PROPERTY_MGR(
                         storage_mgr, env);
-                        
         sender_bean = SANDESHA2_SENDER_MGR_GET_NEXT_MSG_TO_SEND(mgr, env);
         if(NULL == sender_bean)
+        {
+            printf("sender_bean is NULL\n");
             continue;
+        }
             
         key = SANDESHA2_SENDER_BEAN_GET_MSG_CONTEXT_REF_KEY(sender_bean, env);
         msg_ctx = SANDESHA2_STORAGE_MGR_RETRIEVE_MSG_CTX(storage_mgr, env, key, 
@@ -506,7 +509,10 @@
         continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env,
                         sender_bean, sender_impl->conf_ctx, storage_mgr);
         if(AXIS2_FALSE == continue_sending)
+        {
+            printf("continue_sending true\n");
             continue;
+        }
         
         property = AXIS2_MSG_CTX_GET_PROPERTY(msg_ctx, env, 
                         SANDESHA2_QUALIFIED_FOR_SENDING, AXIS2_FALSE);
@@ -526,27 +532,49 @@
         if(NULL != msgs_not_to_send)
         {
             int j = 0;
+            axis2_bool_t continue_sending = AXIS2_FALSE;
+
             for(j = 0; j < AXIS2_ARRAY_LIST_SIZE(msgs_not_to_send, env); j++)
             {
                 axis2_char_t *value = NULL;
                 int int_val = -1;
+                int msg_type = -1;
                 
                 value = AXIS2_ARRAY_LIST_GET(msgs_not_to_send, env, j);
                 int_val = atoi(value);
-                if(SANDESHA2_MSG_CTX_GET_MSG_TYPE(rm_msg_ctx, env) == int_val)
-                    continue_sending = AXIS2_FALSE;
+                msg_type = SANDESHA2_MSG_CTX_GET_MSG_TYPE(rm_msg_ctx, env);
+                if(msg_type == int_val)
+                    continue_sending = AXIS2_TRUE;
             }
-            if(AXIS2_FALSE == continue_sending)
+            if(AXIS2_TRUE == continue_sending)
                 continue;
         }
+        /* 
+         *   This method is not implemented yet
+         *  update_msg(sender, env, msg_xtx);
+         */
         msg_type = SANDESHA2_MSG_CTX_GET_MSG_TYPE(rm_msg_ctx, env);
+        if(msg_type == SANDESHA2_MSG_TYPE_APPLICATION)
+        {
+            sandesha2_seq_t *seq = NULL;
+            axis2_char_t *seq_id = NULL;
+            sandesha2_identifier_t *identifier = NULL;
+            
+            seq = (sandesha2_seq_t*)
+                        SANDESHA2_MSG_CTX_GET_MSG_PART(rm_msg_ctx, 
+                        env, SANDESHA2_MSG_PART_SEQ);
+            identifier = SANDESHA2_SEQ_GET_IDENTIFIER(seq, env);
+            seq_id = SANDESHA2_IDENTIFIER_GET_IDENTIFIER(identifier, env);
+        }
         
         if(AXIS2_TRUE == sandesha2_sender_is_piggybackable_msg_type(sender, env,
                         msg_type) && AXIS2_FALSE  == 
                         sandesha2_sender_is_ack_already_piggybacked(sender, env,
                         rm_msg_ctx))
+        {
             sandesha2_ack_mgr_piggyback_acks_if_present(env, rm_msg_ctx, 
                         storage_mgr);
+        }
         
         
         transport_out = AXIS2_MSG_CTX_GET_TRANSPORT_OUT_DESC(msg_ctx, env);
@@ -574,16 +602,34 @@
                         SANDESHA2_VALUE_TRUE, env));
         AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env, 
                         SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE);
-        
-        bean1 = SANDESHA2_SENDER_MGR_RETRIEVE(mgr, env, 
-                        SANDESHA2_SENDER_BEAN_GET_MSG_ID(sender_bean, env));
+        msg_id = SANDESHA2_SENDER_BEAN_GET_MSG_ID(sender_bean, env);
+        bean1 = SANDESHA2_SENDER_MGR_RETRIEVE(mgr, env, msg_id);
         if(NULL != bean1)
         {
-            SANDESHA2_SENDER_BEAN_SET_SENT_COUNT(bean1, env, 
+            axis2_bool_t resend = AXIS2_FALSE;
+            
+            resend = SANDESHA2_SENDER_BEAN_IS_RESEND(sender_bean, env);
+            printf("resend:%d\n", resend);                
+            if(AXIS2_TRUE == resend)
+            {
+                SANDESHA2_SENDER_BEAN_SET_SENT_COUNT(bean1, env, 
                         SANDESHA2_SENDER_BEAN_GET_SENT_COUNT(sender_bean, env));
-            SANDESHA2_SENDER_BEAN_SET_TIME_TO_SEND(bean1, env, 
+                SANDESHA2_SENDER_BEAN_SET_TIME_TO_SEND(bean1, env, 
                         SANDESHA2_SENDER_BEAN_GET_TIME_TO_SEND(sender_bean, env));
-            SANDESHA2_SENDER_BEAN_MGR_UPDATE(mgr, env, bean1);
+                SANDESHA2_SENDER_MGR_UPDATE(mgr, env, bean1);
+            }
+            else
+            {
+                axis2_char_t *msg_stored_key = NULL;
+                
+                msg_id = SANDESHA2_SENDER_BEAN_GET_MSG_ID(bean1, env); 
+                SANDESHA2_SENDER_MGR_REMOVE(mgr, env, msg_id);
+                /* Removing the message from the storage */
+                msg_stored_key = SANDESHA2_SENDER_BEAN_GET_MSG_CONTEXT_REF_KEY(
+                    bean1, env);
+                SANDESHA2_STORAGE_MGR_REMOVE_MSG_CTX(storage_mgr, env, 
+                    msg_stored_key);
+            }
         }
         if(AXIS2_TRUE == successfully_sent)
         {
@@ -618,8 +664,12 @@
         AXIS2_MSG_CTX_SET_PROPERTY(msg_ctx, env, 
                         SANDESHA2_WITHIN_TRANSACTION, property, AXIS2_FALSE);
         /* TODO make transaction handling effective */
-        SANDESHA2_TRANSACTION_COMMIT(transaction, env);
+        if(transaction)
+        {
+            SANDESHA2_TRANSACTION_COMMIT(transaction, env);
+        }
     }
+    axis2_env_free(env); 
     
     return NULL;
 }

Modified: webservices/sandesha/trunk/c/src/wsrm/identifier.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/wsrm/identifier.c?rev=433159&r1=433158&r2=433159&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/wsrm/identifier.c (original)
+++ webservices/sandesha/trunk/c/src/wsrm/identifier.c Sun Aug 20 22:55:38 2006
@@ -39,17 +39,19 @@
 						const axis2_env_t *env);
     
 void* AXIS2_CALL 
-sandesha2_identifier_from_om_node(sandesha2_iom_rm_element_t *identifier,
-                    	const axis2_env_t *env, axiom_node_t *om_node);
+sandesha2_identifier_from_om_node(
+        sandesha2_iom_rm_element_t *identifier,
+        const axis2_env_t *env, axiom_node_t *om_node);
     
 axiom_node_t* AXIS2_CALL 
-sandesha2_identifier_to_om_node(sandesha2_iom_rm_element_t *identifier,
-                    	const axis2_env_t *env, void *om_node);
+sandesha2_identifier_to_om_node(
+        sandesha2_iom_rm_element_t *identifier,
+        const axis2_env_t *env, void *om_node);
                     	
 axis2_bool_t AXIS2_CALL 
 sandesha2_identifier_is_namespace_supported(
-                        sandesha2_iom_rm_element_t *identifier,
-                    	const axis2_env_t *env, axis2_char_t *namespace);
+        sandesha2_iom_rm_element_t *identifier,
+       	const axis2_env_t *env, axis2_char_t *namespace);
                     	
 axis2_char_t * AXIS2_CALL
 sandesha2_identifier_get_identifier(sandesha2_identifier_t *identifier,
@@ -66,20 +68,14 @@
 /***************************** End of function headers ************************/
 
 AXIS2_EXTERN sandesha2_identifier_t* AXIS2_CALL
-sandesha2_identifier_create(const axis2_env_t *env,  axis2_char_t *ns_val)
+sandesha2_identifier_create(
+        const axis2_env_t *env,  
+        axis2_char_t *ns_val)
 {
     sandesha2_identifier_impl_t *identifier_impl = NULL;
     AXIS2_ENV_CHECK(env, NULL);
     AXIS2_PARAM_CHECK(env->error, ns_val, NULL);
-    
-    if(AXIS2_FALSE == sandesha2_identifier_is_namespace_supported(
-                        (sandesha2_iom_rm_element_t*)identifier_impl, env, 
-                        ns_val))
-    {
-        AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_UNSUPPORTED_NS, 
-                            AXIS2_FAILURE);
-        return NULL;
-    }        
+
     identifier_impl =  (sandesha2_identifier_impl_t *)AXIS2_MALLOC 
                         (env->allocator, sizeof(sandesha2_identifier_impl_t));
 	
@@ -93,9 +89,17 @@
     identifier_impl->identifier.ops = NULL;
     identifier_impl->identifier.element.ops = NULL;
     
+    if(AXIS2_FALSE == sandesha2_identifier_is_namespace_supported(
+                        (sandesha2_iom_rm_element_t*)identifier_impl, env, 
+                        ns_val))
+    {
+        AXIS2_ERROR_SET(env->error, SANDESHA2_ERROR_UNSUPPORTED_NS, 
+                            AXIS2_FAILURE);
+        return NULL;
+    }        
     
     identifier_impl->identifier.ops = AXIS2_MALLOC(env->allocator,
-        sizeof(sandesha2_iom_rm_element_ops_t));
+        sizeof(sandesha2_identifier_ops_t));
     if(NULL == identifier_impl->identifier.ops)
 	{
 		sandesha2_identifier_free((sandesha2_iom_rm_element_t*)
@@ -104,7 +108,7 @@
         return NULL;
 	}
     identifier_impl->identifier.element.ops = AXIS2_MALLOC(env->allocator,
-        sizeof(sandesha2_identifier_ops_t));
+        sizeof(sandesha2_iom_rm_element_ops_t));
     if(NULL == identifier_impl->identifier.element.ops)
 	{
 		sandesha2_identifier_free((sandesha2_iom_rm_element_t*)
@@ -178,8 +182,9 @@
 
 
 void* AXIS2_CALL 
-sandesha2_identifier_from_om_node(sandesha2_iom_rm_element_t *identifier,
-                    	const axis2_env_t *env, axiom_node_t *om_node)
+sandesha2_identifier_from_om_node(
+        sandesha2_iom_rm_element_t *identifier,
+        const axis2_env_t *env, axiom_node_t *om_node)
 {
 	sandesha2_identifier_impl_t *identifier_impl = NULL;
     axiom_element_t *om_element = NULL;



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org