You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2007/12/05 07:07:55 UTC

svn commit: r601217 - in /webservices/sandesha/trunk/c: include/sandesha2_sender_worker.h src/storage/sqlite/permanent_storage_mgr.c src/util/msg_creator.c src/workers/sender.c src/workers/sender_worker.c

Author: damitha
Date: Tue Dec  4 22:07:54 2007
New Revision: 601217

URL: http://svn.apache.org/viewvc?rev=601217&view=rev
Log:
removed the sender worker thread. Instead sender use a function to send messages.

Modified:
    webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h
    webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c
    webservices/sandesha/trunk/c/src/util/msg_creator.c
    webservices/sandesha/trunk/c/src/workers/sender.c
    webservices/sandesha/trunk/c/src/workers/sender_worker.c

Modified: webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h (original)
+++ webservices/sandesha/trunk/c/include/sandesha2_sender_worker.h Tue Dec  4 22:07:54 2007
@@ -95,6 +95,13 @@
     const axutil_env_t *env,
     const axis2_bool_t persistent_msg_ctx);
 
+axis2_status_t
+sandesha2_sender_worker_send(
+    axutil_env_t *env,
+    axis2_conf_ctx_t *conf_ctx,
+    axis2_char_t *msg_id,
+    axis2_bool_t persistent_msg_ctx);
+
 void sandesha2_sender_worker_set_transport_out(
     sandesha2_sender_worker_t *sender_worker,
     const axutil_env_t *env,

Modified: webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c (original)
+++ webservices/sandesha/trunk/c/src/storage/sqlite/permanent_storage_mgr.c Tue Dec  4 22:07:54 2007
@@ -816,9 +816,9 @@
     sandesha2_msg_store_bean_t *msg_store_bean = NULL;
 
     storage_mgr_impl = SANDESHA2_INTF_TO_IMPL(storage_mgr);
-    /*if(!persistent)
+    if(!persistent)
         msg_ctx = (axis2_msg_ctx_t *) axutil_hash_get(
-            storage_mgr_impl->msg_ctx_map, key, AXIS2_HASH_KEY_STRING);*/
+            storage_mgr_impl->msg_ctx_map, key, AXIS2_HASH_KEY_STRING);
     if(msg_ctx)
         return msg_ctx;
     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2]retrieved from database");

Modified: webservices/sandesha/trunk/c/src/util/msg_creator.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/util/msg_creator.c?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/util/msg_creator.c (original)
+++ webservices/sandesha/trunk/c/src/util/msg_creator.c Tue Dec  4 22:07:54 2007
@@ -803,6 +803,11 @@
                     {
                         axutil_property_set_own_value(new_prop, env, AXIS2_FALSE);
                     }
+                    if(0 == axutil_strcmp(AXIS2_HTTP_CLIENT, key))
+                    {
+                        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "dam_axis2_http_client");
+                        axutil_property_set_own_value(new_prop, env, AXIS2_FALSE);
+                    }
                     axutil_hash_set(new_msg_props, key, AXIS2_HASH_KEY_STRING, new_prop);
                 }
             }

Modified: webservices/sandesha/trunk/c/src/workers/sender.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender.c?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/sender.c (original)
+++ webservices/sandesha/trunk/c/src/workers/sender.c Tue Dec  4 22:07:54 2007
@@ -262,7 +262,6 @@
         sandesha2_transaction_t *transaction = NULL;
         sandesha2_sender_mgr_t *mgr = NULL;
         sandesha2_sender_bean_t *sender_bean = NULL;
-        sandesha2_sender_worker_t *sender_worker = NULL;
         axis2_char_t *msg_id = NULL;
 
         transaction = sandesha2_storage_mgr_get_transaction(storage_mgr, env);
@@ -282,14 +281,9 @@
         if(msg_id)
         {
             axis2_bool_t status = AXIS2_TRUE;
-            /* Start a sender worker which will work on this message */
-            sender_worker = sandesha2_sender_worker_create(env, sender->conf_ctx, 
-                msg_id);
-            sandesha2_sender_worker_run(sender_worker, env, 
+            status = sandesha2_sender_worker_send(env, sender->conf_ctx, msg_id, 
                 sender->persistent_msg_ctx);
             AXIS2_SLEEP(sleep_time * 2); 
-            status = sandesha2_sender_worker_get_status(
-                sender_worker, env);
             if(!status)
             {
                 AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 

Modified: webservices/sandesha/trunk/c/src/workers/sender_worker.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/c/src/workers/sender_worker.c?rev=601217&r1=601216&r2=601217&view=diff
==============================================================================
--- webservices/sandesha/trunk/c/src/workers/sender_worker.c (original)
+++ webservices/sandesha/trunk/c/src/workers/sender_worker.c Tue Dec  4 22:07:54 2007
@@ -299,8 +299,12 @@
     {
         axutil_allocator_switch_to_global_pool(env->allocator);
         if(sender_worker->persistent_msg_ctx)
+        {
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                "[sandesha2]Retrieving msg_ctx from database");
             msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, 
                 key, sender_worker->conf_ctx, AXIS2_FALSE);
+        }
         else
             msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, 
                 key, sender_worker->conf_ctx, AXIS2_TRUE);
@@ -443,7 +447,6 @@
         if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, msg_ctx))
 		{
         	successfully_sent = AXIS2_TRUE;
-        	sender_worker->counter++;
 		}else
 		{
         	successfully_sent = AXIS2_FALSE;
@@ -540,6 +543,288 @@
     AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
         "[sandesha2] Exit:sandesha2_sender_worker_worker_func\n");        
     return NULL;
+
+}
+
+axis2_status_t
+sandesha2_sender_worker_send(
+    axutil_env_t *env,
+    axis2_conf_ctx_t *conf_ctx,
+    axis2_char_t *msg_id,
+    axis2_bool_t persistent_msg_ctx)
+{
+    sandesha2_sender_worker_t *sender_worker = NULL;
+    sandesha2_storage_mgr_t *storage_mgr = NULL;
+    sandesha2_transaction_t *transaction = NULL;
+    sandesha2_sender_bean_t *sender_worker_bean = NULL;
+    sandesha2_sender_bean_t *bean1 = NULL;
+    sandesha2_sender_mgr_t *sender_mgr = NULL;
+    axis2_char_t *key = NULL;
+    axutil_property_t *property = NULL;
+    axis2_bool_t continue_sending = AXIS2_TRUE;
+    axis2_char_t *qualified_for_sending = NULL;
+    axis2_msg_ctx_t *msg_ctx = NULL;
+    sandesha2_msg_ctx_t *rm_msg_ctx = NULL;
+    sandesha2_property_bean_t *prop_bean = NULL;
+    axutil_array_list_t *msgs_not_to_send = NULL;
+    int msg_type = -1;
+    axis2_transport_out_desc_t *transport_out = NULL;
+    axis2_transport_sender_t *transport_sender = NULL;
+    axis2_bool_t successfully_sent = AXIS2_FALSE;
+    axis2_status_t status = AXIS2_SUCCESS;
+
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+        "[sandesha2]Entry:sandesha2_sender_worker_send");        
+    
+    storage_mgr = sandesha2_utils_get_storage_mgr(env, 
+        conf_ctx, axis2_conf_ctx_get_conf(conf_ctx, env));
+    transaction = sandesha2_storage_mgr_get_transaction(storage_mgr, env);
+    sender_mgr = sandesha2_storage_mgr_get_retrans_mgr(storage_mgr, env);
+    sender_worker_bean = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
+    if(!sender_worker_bean)
+    {
+        AXIS2_LOG_WARNING(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] sender_worker_bean is NULL");
+        sandesha2_transaction_rollback(transaction, env);
+        return AXIS2_FAILURE;
+    }
+
+    key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_worker_bean, env);
+    if(!msg_ctx)
+    {
+        axutil_allocator_switch_to_global_pool(env->allocator);
+        if(persistent_msg_ctx)
+        {
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+                "[sandesha2]Retrieving msg_ctx from database");
+            msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, 
+                key, conf_ctx, AXIS2_FALSE);
+        }
+        else
+            msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, 
+                key, conf_ctx, AXIS2_TRUE);
+        axutil_allocator_switch_to_local_pool(env->allocator);
+    }
+    if(!msg_ctx)
+    {
+        sandesha2_transaction_rollback(transaction, env);
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx is "\
+            "not present in the store");
+        return AXIS2_FAILURE;
+    }
+    property = axis2_msg_ctx_get_property(msg_ctx, env, 
+        SANDESHA2_WITHIN_TRANSACTION);
+    if(property)
+        axutil_property_set_value(property, env, AXIS2_VALUE_TRUE);
+    else
+    {
+        property = axutil_property_create_with_args(env, 0, 0, 0, 
+            AXIS2_VALUE_TRUE);
+        axis2_msg_ctx_set_property(msg_ctx, env, SANDESHA2_WITHIN_TRANSACTION,
+            property);
+    }
+    continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env,
+        sender_worker_bean, conf_ctx, storage_mgr);
+    sandesha2_sender_mgr_update(sender_mgr, env, sender_worker_bean);
+    if(!continue_sending)
+    {
+        status = AXIS2_FAILURE;
+        /* We commit here since we have cleaned the
+         * sending side data and that need to commited */
+        sandesha2_transaction_commit(transaction, env);
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] Do not continue sending the message");
+        return status;
+    }
+    
+    property = axis2_msg_ctx_get_property(msg_ctx, env, 
+        SANDESHA2_QUALIFIED_FOR_SENDING);
+    if(property)
+        qualified_for_sending = axutil_property_get_value(property, env);
+    if(qualified_for_sending && 0 != axutil_strcmp(
+        qualified_for_sending, AXIS2_VALUE_TRUE))
+    {
+        sandesha2_transaction_rollback(transaction, env);
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+            "[sandesha2] Message is not qualified for sending");
+        return AXIS2_FAILURE;
+    }
+    rm_msg_ctx = sandesha2_msg_init_init_msg(env, msg_ctx);
+    
+    prop_bean = sandesha2_utils_get_property_bean(env, 
+        axis2_conf_ctx_get_conf(conf_ctx, env));
+    if(prop_bean)
+        msgs_not_to_send = sandesha2_property_bean_get_msg_types_to_drop(
+            prop_bean, env);
+    if(msgs_not_to_send)
+    {
+        int j = 0;
+        axis2_bool_t continue_sending = AXIS2_FALSE;
+
+        for(j = 0; j < axutil_array_list_size(msgs_not_to_send, env); j++)
+        {
+            axis2_char_t *value = NULL;
+            int int_val = -1;
+            int msg_type = -1;
+            
+            value = axutil_array_list_get(msgs_not_to_send, env, j);
+            int_val = atoi(value);
+            msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
+            if(msg_type == int_val)
+                continue_sending = AXIS2_TRUE;
+        }
+        if(continue_sending)
+        {
+            sandesha2_transaction_rollback(transaction, env);
+            AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] Continue "\
+                "Sending is true. So returning from Sender Worker");
+            return AXIS2_SUCCESS;
+        }
+    }
+    /* 
+     *  This method is not implemented yet
+     *  update_msg(sender_worker, env, msg_xtx);
+     */
+    msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
+    if(msg_type == SANDESHA2_MSG_TYPE_APPLICATION)
+    {
+        sandesha2_seq_t *seq = NULL;
+        axis2_char_t *seq_id = NULL;
+        sandesha2_identifier_t *identifier = NULL;
+        
+        seq = (sandesha2_seq_t*)
+                    sandesha2_msg_ctx_get_msg_part(rm_msg_ctx, 
+                    env, SANDESHA2_MSG_PART_SEQ);
+        identifier = sandesha2_seq_get_identifier(seq, env);
+        seq_id = sandesha2_identifier_get_identifier(identifier, env);
+    }
+    if(sandesha2_sender_worker_is_piggybackable_msg_type(sender_worker, env,
+        msg_type) && AXIS2_FALSE  == 
+        sandesha2_sender_worker_is_ack_already_piggybacked(sender_worker, env,
+        rm_msg_ctx))
+    {
+        sandesha2_ack_mgr_piggyback_acks_if_present(env, rm_msg_ctx, 
+            storage_mgr);
+    }
+    
+    if(!transport_out) 
+        transport_out = axis2_msg_ctx_get_transport_out_desc(msg_ctx, env);
+    transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
+    if(transport_sender)
+    {
+        sandesha2_transaction_commit(transaction, env);
+        property = axis2_msg_ctx_get_property(msg_ctx, env, 
+            SANDESHA2_WITHIN_TRANSACTION);
+        if(property)
+            axutil_property_set_value(property, env, AXIS2_VALUE_FALSE);
+        else
+        {
+            property = axutil_property_create_with_args(env, 0, 0, 0,
+                AXIS2_VALUE_FALSE);
+            axis2_msg_ctx_set_property(msg_ctx, env, 
+                SANDESHA2_WITHIN_TRANSACTION, property);
+        }
+        /* This is neccessary to avoid a double free */
+        axis2_msg_ctx_set_property(msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
+        /* Consider building soap envelope */
+        if(AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, msg_ctx))
+		{
+        	successfully_sent = AXIS2_TRUE;
+		}else
+		{
+        	successfully_sent = AXIS2_FALSE;
+		}
+    }
+    transaction = sandesha2_storage_mgr_get_transaction(storage_mgr, env);
+    property = axis2_msg_ctx_get_property(msg_ctx, env, 
+        SANDESHA2_WITHIN_TRANSACTION); 
+    if(property)
+        axutil_property_set_value(property, env, AXIS2_VALUE_TRUE);
+    else
+    {
+        property = axutil_property_create_with_args(env, 0, 0, 0,
+            AXIS2_VALUE_TRUE);
+        axis2_msg_ctx_set_property(msg_ctx, env, 
+            SANDESHA2_WITHIN_TRANSACTION, property);
+    }
+    msg_id = sandesha2_sender_bean_get_msg_id((sandesha2_rm_bean_t *) 
+        sender_worker_bean, env);
+    bean1 = sandesha2_sender_mgr_retrieve(sender_mgr, env, msg_id);
+    if(bean1)
+    {
+        axis2_bool_t resend = AXIS2_FALSE;
+        
+        resend = sandesha2_sender_bean_is_resend(sender_worker_bean, env);
+        if(resend)
+        {
+            sandesha2_sender_bean_set_sent_count(bean1, env, 
+                sandesha2_sender_bean_get_sent_count(sender_worker_bean, env));
+            sandesha2_sender_bean_set_time_to_send(bean1, env, 
+                sandesha2_sender_bean_get_time_to_send(sender_worker_bean, env));
+            sandesha2_sender_mgr_update(sender_mgr, env, bean1);
+        }
+        else
+        {
+            axis2_char_t *msg_stored_key = NULL;
+            
+            msg_id = sandesha2_sender_bean_get_msg_id((sandesha2_rm_bean_t *) 
+                bean1, env); 
+            sandesha2_sender_mgr_remove(sender_mgr, env, msg_id);
+            /* Removing the message from the storage */
+            msg_stored_key = sandesha2_sender_bean_get_msg_ctx_ref_key(
+                bean1, env);
+            sandesha2_storage_mgr_remove_msg_ctx(storage_mgr, env, 
+                msg_stored_key);
+        }
+    }
+    if(successfully_sent)
+    {
+        if(AXIS2_FALSE == axis2_msg_ctx_get_server_side(msg_ctx, env))
+            sandesha2_sender_worker_check_for_sync_res(env, msg_ctx);
+    }
+    msg_type = sandesha2_msg_ctx_get_msg_type(rm_msg_ctx, env);
+    if(SANDESHA2_MSG_TYPE_TERMINATE_SEQ == msg_type)
+    {
+        sandesha2_terminate_seq_t *terminate_seq = NULL;
+        axis2_char_t *seq_id = NULL;
+        axis2_conf_ctx_t *conf_ctx = NULL;
+        axis2_char_t *internal_seq_id = NULL;
+        
+        terminate_seq = (sandesha2_terminate_seq_t*)
+                    sandesha2_msg_ctx_get_msg_part(rm_msg_ctx, env, 
+                    SANDESHA2_MSG_PART_TERMINATE_SEQ);
+        seq_id = sandesha2_identifier_get_identifier(
+                    sandesha2_terminate_seq_get_identifier(terminate_seq, 
+                    env), env);
+        conf_ctx = axis2_msg_ctx_get_conf_ctx(msg_ctx, env);
+        internal_seq_id = sandesha2_utils_get_seq_property(env, seq_id, 
+                    SANDESHA2_SEQ_PROP_INTERNAL_SEQ_ID, storage_mgr);
+        sandesha2_terminate_mgr_terminate_sending_side(env, conf_ctx,
+            internal_seq_id, axis2_msg_ctx_get_server_side(msg_ctx, env), 
+                storage_mgr);
+        /* We have no more messages for this sequence. So continue send 
+         * status is false*/
+        status = AXIS2_FAILURE;
+    }
+    property = axis2_msg_ctx_get_property(msg_ctx, env, 
+        SANDESHA2_WITHIN_TRANSACTION);
+    if(property)
+        axutil_property_set_value(property, env, AXIS2_VALUE_FALSE);
+    else
+    {
+        property = axutil_property_create_with_args(env, 0, 0, 0, 
+            AXIS2_VALUE_FALSE);
+        axis2_msg_ctx_set_property(msg_ctx, env, 
+                    SANDESHA2_WITHIN_TRANSACTION, property);
+    }
+    /* TODO make transaction handling effective */
+    if(transaction)
+    {
+        sandesha2_transaction_commit(transaction, env);
+    }
+    AXIS2_LOG_TRACE(env->log, AXIS2_LOG_SI, 
+        "[sandesha2]Exit:sandesha2_sender_worker_send");        
+    return status;
 }
 
 static axis2_bool_t AXIS2_CALL
@@ -586,7 +871,6 @@
    
     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI,
         "[sandesha2] Start:sandesha2_sender_worker_check_for_sync_res");
-    AXIS2_ENV_CHECK(env, AXIS2_FAILURE);
     AXIS2_PARAM_CHECK(env->error, msg_ctx, AXIS2_FAILURE);
     /*property = axis2_msg_ctx_get_property(msg_ctx, env, AXIS2_TRANSPORT_IN);
     if(!property)
@@ -646,8 +930,12 @@
 
     res_envelope = axis2_msg_ctx_get_response_soap_envelope(msg_ctx, env);
     if(!res_envelope)
+    {
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+            "[sandesha2]Response envelope not found");
         res_envelope = axis2_http_transport_utils_create_soap_msg(env, msg_ctx,
             soap_ns_uri);
+    }
     
     property = axis2_msg_ctx_get_property(msg_ctx, env, 
         SANDESHA2_WITHIN_TRANSACTION);
@@ -659,6 +947,8 @@
     if(res_envelope)
     {
         axis2_engine_t *engine = NULL;
+        AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, 
+            "[sandesha2]Response envelope found");
         axis2_msg_ctx_set_soap_envelope(res_msg_ctx, env, res_envelope);
        
         engine = axis2_engine_create(env, axis2_msg_ctx_get_conf_ctx(msg_ctx, 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org