You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by da...@apache.org on 2008/06/17 16:54:04 UTC

svn commit: r668694 - in /webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008: include/ src/handlers/ src/msgprocessors/ src/storage/sqlite/

Author: damitha
Date: Tue Jun 17 07:54:04 2008
New Revision: 668694

URL: http://svn.apache.org/viewvc?rev=668694&view=rev
Log:
Now dropped messages are responded properly.

Modified:
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c
    webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/include/sandesha2_sender_bean.h Tue Jun 17 07:54:04 2008
@@ -79,7 +79,7 @@
 sandesha2_sender_bean_set_send (
     sandesha2_sender_bean_t *sender,
     const axutil_env_t *env,
-axis2_bool_t send);
+    axis2_bool_t send);
 
 axis2_char_t* AXIS2_CALL
 sandesha2_sender_bean_get_internal_seq_id (

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/handlers/sandesha2_in_handler.c Tue Jun 17 07:54:04 2008
@@ -14,6 +14,7 @@
  * limitations under the License.
  */
 
+#include <axis2_engine.h>
 #include <axis2_handler_desc.h>
 #include <axutil_array_list.h>
 #include <axis2_svc.h>
@@ -58,6 +59,7 @@
 sandesha2_in_handler_process_dropped_msg(
     struct axis2_handler *handler, 
     const axutil_env_t *env,
+    axis2_conf_ctx_t *conf_ctx,
     sandesha2_msg_ctx_t *rm_msg_ctx,
     sandesha2_storage_mgr_t *storage_mgr,
     sandesha2_seq_property_mgr_t *seq_prop_mgr,
@@ -176,7 +178,7 @@
 
     if(dropped)
     {
-        sandesha2_in_handler_process_dropped_msg(handler, env, rm_msg_ctx, storage_mgr, 
+        sandesha2_in_handler_process_dropped_msg(handler, env, conf_ctx, rm_msg_ctx, storage_mgr, 
                 seq_prop_mgr, sender_mgr);
 
         AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "[sandesha2] msg_ctx dropped. So return here");
@@ -196,6 +198,7 @@
 
         return AXIS2_SUCCESS;
     }
+
     /* 
      * TODO Validate the message
      * sandesha2_msg_validator_validate(env, rm_msg_ctx);
@@ -208,6 +211,7 @@
         sandesha2_msg_processor_process_in_msg(ack_proc, env, rm_msg_ctx);
         sandesha2_msg_processor_free(ack_proc, env);
     }*/
+
     ack_requested = sandesha2_msg_ctx_get_ack_requested(rm_msg_ctx, env);
     if(ack_requested)
     {
@@ -404,11 +408,16 @@
     return AXIS2_FALSE;
 }
 
-
+/* In this function appropriately respond for the dropeed message. In two way messaging if the
+ * response for the dropped application message is not acknowledged then take the the response
+ * message from the database and append acknowledgment for the dropped message to it.
+ * Otherwise send the acknowledgment for the dropped message.
+ */
 static axis2_status_t AXIS2_CALL
 sandesha2_in_handler_process_dropped_msg(
     struct axis2_handler *handler, 
     const axutil_env_t *env,
+    axis2_conf_ctx_t *conf_ctx,
     sandesha2_msg_ctx_t *rm_msg_ctx,
     sandesha2_storage_mgr_t *storage_mgr,
     sandesha2_seq_property_mgr_t *seq_prop_mgr,
@@ -425,16 +434,15 @@
         sandesha2_seq_t *sequence = NULL;
         axis2_char_t *rmd_sequence_id = NULL;
         
-        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "came1");
         sequence = sandesha2_msg_ctx_get_sequence(rm_msg_ctx, env);
         if(sequence)
+        {
             rmd_sequence_id = sandesha2_identifier_get_identifier(sandesha2_seq_get_identifier(sequence, 
                         env), env);
+        }
             
         if(rmd_sequence_id)
         {
-            AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "came2");
-
             sandesha2_seq_property_bean_t *rcvd_msgs_bean = NULL;
             axis2_char_t *rcvd_msgs_str = NULL;
             sandesha2_msg_processor_t *app_msg_processor = NULL;
@@ -444,17 +452,60 @@
 
             if(rcvd_msgs_bean)
             {
+                sandesha2_sender_bean_t *sender_bean = NULL;
+                axis2_char_t *internal_sequence_id = NULL;
+                long msg_no = -1;
+                sandesha2_sender_bean_t *find_sender_bean = NULL;
+
                 rcvd_msgs_str = sandesha2_seq_property_bean_get_value(rcvd_msgs_bean, env);
                 
-                AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "rcvd_msgs_str:%s", rcvd_msgs_str);
+                msg_no = sandesha2_msg_number_get_msg_num(sandesha2_seq_get_msg_num(sequence, env), env);
+                internal_sequence_id = sandesha2_utils_get_internal_sequence_id(env, rmd_sequence_id);
+                find_sender_bean = sandesha2_sender_bean_create(env);
+                sandesha2_sender_bean_set_msg_no(find_sender_bean, env, msg_no);
+                sandesha2_sender_bean_set_internal_seq_id(find_sender_bean, env, internal_sequence_id);
+                sandesha2_sender_bean_set_send(find_sender_bean, env, AXIS2_TRUE);
 
-                AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "came3");
-
-                app_msg_processor = sandesha2_app_msg_processor_create(env);
-                sandesha2_app_msg_processor_send_ack_if_reqd(env, rm_msg_ctx, rcvd_msgs_str, 
+                sender_bean = sandesha2_sender_mgr_find_unique(sender_mgr, env, find_sender_bean);
+                if(sender_bean)
+                {
+                    axis2_char_t *storage_key = NULL;
+                    axis2_msg_ctx_t *app_msg_ctx = NULL;
+                    sandesha2_msg_ctx_t *app_rm_msg_ctx = NULL;
+                    axis2_engine_t *engine = NULL;
+                    axis2_op_ctx_t *op_ctx = NULL;
+                    axis2_msg_ctx_t *in_msg_ctx = NULL;
+
+                    storage_key = sandesha2_sender_bean_get_msg_ctx_ref_key(sender_bean, env);
+                    app_msg_ctx = sandesha2_storage_mgr_retrieve_msg_ctx(storage_mgr, env, storage_key, 
+                            conf_ctx, AXIS2_TRUE);
+
+                    in_msg_ctx = sandesha2_msg_ctx_get_msg_ctx(rm_msg_ctx, env);
+
+                    axis2_msg_ctx_set_transport_out_stream(app_msg_ctx, env, 
+                            axis2_msg_ctx_get_transport_out_stream(in_msg_ctx, env));
+
+                    axis2_msg_ctx_set_out_transport_info(app_msg_ctx, env, 
+                            axis2_msg_ctx_get_out_transport_info(in_msg_ctx, env));
+
+                    op_ctx = axis2_msg_ctx_get_op_ctx(in_msg_ctx, env);
+
+                    axis2_op_ctx_set_response_written(op_ctx, env, AXIS2_TRUE);
+
+                    app_rm_msg_ctx = sandesha2_msg_init_init_msg(env, app_msg_ctx);
+                    sandesha2_msg_creator_add_ack_msg(env, app_rm_msg_ctx, rmd_sequence_id, seq_prop_mgr);
+                    engine = axis2_engine_create(env, conf_ctx);
+                    axis2_engine_send(engine, env, app_msg_ctx);
+                    sandesha2_msg_ctx_free(app_rm_msg_ctx, env);
+                }
+                else
+                {
+                    app_msg_processor = sandesha2_app_msg_processor_create(env);
+                    sandesha2_app_msg_processor_send_ack_if_reqd(env, rm_msg_ctx, rcvd_msgs_str, 
                         rmd_sequence_id, storage_mgr, sender_mgr, seq_prop_mgr);
-
-                sandesha2_msg_processor_free(app_msg_processor, env);
+                    
+                    sandesha2_msg_processor_free(app_msg_processor, env);
+                }
             }
         }
     }

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/msgprocessors/app_msg_processor.c Tue Jun 17 07:54:04 2008
@@ -2280,6 +2280,7 @@
     if (to_bean)
     {
         to_addr = axutil_strdup(env, sandesha2_seq_property_bean_get_value(to_bean, env));
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_to_addr1:%s", to_addr);
         to_epr = axis2_endpoint_ref_create(env, to_addr);
         sandesha2_seq_property_bean_free(to_bean, env);
     }
@@ -2478,6 +2479,36 @@
         sandesha2_seq_property_bean_free(from_acks_to_bean, env);
     }
 
+    app_msg_sender_bean = sandesha2_sender_bean_create(env);
+    sandesha2_sender_bean_set_internal_seq_id(app_msg_sender_bean, env, internal_sequence_id);
+    sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_sender_bean, env, storage_key);
+    millisecs = sandesha2_utils_get_current_time_in_millis(env);
+    sandesha2_sender_bean_set_time_to_send(app_msg_sender_bean, env, millisecs);
+    msg_id = sandesha2_msg_ctx_get_msg_id(rm_msg_ctx, env);
+    sandesha2_sender_bean_set_msg_id(app_msg_sender_bean, env, msg_id);
+    sandesha2_sender_bean_set_msg_no(app_msg_sender_bean, env, msg_num);
+    sandesha2_sender_bean_set_msg_type(app_msg_sender_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
+
+    if(!rms_sequence_id)
+    {
+        sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_FALSE);
+    }
+    else
+    {
+        sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
+        property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
+        axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);
+    }
+
+    /**
+     * When we store application message context as below it should be noted
+     * that at Sandesha2/C client application side this is actually stored in
+     * in-memory whereas in the web service side it is actually stored in
+     * database only.
+     */
+    sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_sender_bean);
+    sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, storage_key, app_msg_ctx);
+
     is_svr_side = axis2_msg_ctx_get_server_side(app_msg_ctx, env);
 
     /* 
@@ -2495,6 +2526,7 @@
 
         engine = axis2_engine_create(env, conf_ctx);
         status = axis2_engine_resume_send(engine, env, app_msg_ctx);
+
         if(engine)
         {
             axis2_engine_free(engine, env);
@@ -2535,36 +2567,6 @@
         mep = axis2_op_get_msg_exchange_pattern(op, env);
     }
 
-    app_msg_sender_bean = sandesha2_sender_bean_create(env);
-    sandesha2_sender_bean_set_internal_seq_id(app_msg_sender_bean, env, internal_sequence_id);
-    sandesha2_sender_bean_set_msg_ctx_ref_key(app_msg_sender_bean, env, storage_key);
-    millisecs = sandesha2_utils_get_current_time_in_millis(env);
-    sandesha2_sender_bean_set_time_to_send(app_msg_sender_bean, env, millisecs);
-    msg_id = sandesha2_msg_ctx_get_msg_id(rm_msg_ctx, env);
-    sandesha2_sender_bean_set_msg_id(app_msg_sender_bean, env, msg_id);
-    sandesha2_sender_bean_set_msg_no(app_msg_sender_bean, env, msg_num);
-    sandesha2_sender_bean_set_msg_type(app_msg_sender_bean, env, SANDESHA2_MSG_TYPE_APPLICATION);
-
-    if(!rms_sequence_id)
-    {
-        sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_FALSE);
-    }
-    else
-    {
-        sandesha2_sender_bean_set_send(app_msg_sender_bean, env, AXIS2_TRUE);
-        property = axutil_property_create_with_args(env, 0, 0, 0, AXIS2_VALUE_TRUE);
-        axis2_msg_ctx_set_property(app_msg_ctx, env, SANDESHA2_SET_SEND_TO_TRUE, property);
-    }
-
-    /**
-     * When we store application message context as below it should be noted
-     * that at Sandesha2/C client application side this is actually stored in
-     * in-memory whereas in the web service side it is actually stored in
-     * database only.
-     */
-    sandesha2_sender_mgr_insert(sender_mgr, env, app_msg_sender_bean);
-    sandesha2_storage_mgr_store_msg_ctx(storage_mgr, env, storage_key, app_msg_ctx);
-
     continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, app_msg_sender_bean, 
             conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
 
@@ -2677,14 +2679,14 @@
             {
                 transport_sender = axis2_transport_out_desc_get_sender(transport_out, env);
             }
-            if(transport_sender)
+            if(!transport_sender)
             {
                 AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
                         "[sandesha2] Transport sender could not be retrieved from transport_out");
                 status = AXIS2_FAILURE;
             }
 
-            while(AXIS2_TRUE)
+            while(AXIS2_TRUE && transport_sender)
             {
                 continue_sending = sandesha2_msg_retrans_adjuster_adjust_retrans(env, sender_bean, 
                         conf_ctx, storage_mgr, seq_prop_mgr, create_seq_mgr, sender_mgr);
@@ -2704,15 +2706,13 @@
                 }
 
                 AXIS2_SLEEP(retrans_interval);
-                if(transport_sender)
+
+                /* This is neccessary to avoid a double free */
+                axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
+                if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
                 {
-                    /* This is neccessary to avoid a double free */
-                    axis2_msg_ctx_set_property(app_msg_ctx, env, AXIS2_TRANSPORT_IN, NULL);
-                    if(!AXIS2_TRANSPORT_SENDER_INVOKE(transport_sender, env, app_msg_ctx))
-                    {
-                        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
-                                "[sandesha2] Transport sender invoke failed in sending application message");
-                    }
+                    AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, 
+                            "[sandesha2] Transport sender invoke failed in sending application message");
                 }
 
                 if(!axis2_msg_ctx_get_server_side(app_msg_ctx, env))

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_bean_mgr.c Tue Jun 17 07:54:04 2008
@@ -128,9 +128,15 @@
         if(0 == axutil_strcmp(col_name[i], "op_mep"))
             if(argv[i])
                 sandesha2_msg_store_bean_set_op_mep(bean, env, argv[i]);
+
         if(0 == axutil_strcmp(col_name[i], "to_url"))
-            if(argv[i])
+        {
+            if(argv[i] && axutil_strcmp("(null)", argv[i]))
+            {
                 sandesha2_msg_store_bean_set_to_url(bean, env, argv[i]);
+            }
+        }
+
         if(0 == axutil_strcmp(col_name[i], "transport_to"))
             if(argv[i] && 0 != axutil_strcmp("(null)", argv[i]))
             {

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_sender_mgr.c Tue Jun 17 07:54:04 2008
@@ -552,6 +552,7 @@
                 " and  send = %d", send);
     }
     sprintf(sql_find + axutil_strlen(sql_find), ";");
+    AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_sql_find:%s", sql_find);
     find_list = sandesha2_permanent_bean_mgr_find(sender_mgr_impl->bean_mgr, env, 
         sandesha2_sender_find_callback, sql_find);
 
@@ -608,10 +609,12 @@
     {
         sprintf(sql_find + axutil_strlen(sql_find), "time_to_send <= %ld ", time_now);
     }*/
+    
+    sprintf(sql_find + axutil_strlen(sql_find), "msg_type='%d'", SANDESHA2_MSG_TYPE_APPLICATION);
 
     if(seq_id)
     {
-        sprintf(sql_find + axutil_strlen(sql_find), "internal_seq_id='%s'", seq_id);
+        sprintf(sql_find + axutil_strlen(sql_find), "and internal_seq_id='%s'", seq_id);
     }
     
     if(msg_id)
@@ -619,8 +622,6 @@
         sprintf(sql_find + axutil_strlen(sql_find), "and msg_id='%s'", msg_id);
     }
         
-    sprintf(sql_find + axutil_strlen(sql_find), "and msg_type='%d'", SANDESHA2_MSG_TYPE_APPLICATION);
-
     sprintf(sql_find + axutil_strlen(sql_find), " and send=%d", AXIS2_TRUE);
 
     AXIS2_LOG_DEBUG(env->log, AXIS2_LOG_SI, "sql_find:%s", sql_find);  

Modified: webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c
URL: http://svn.apache.org/viewvc/webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c?rev=668694&r1=668693&r2=668694&view=diff
==============================================================================
--- webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c (original)
+++ webservices/sandesha/tags/sandesha2/c/worker_thread_removed-23may2008/src/storage/sqlite/permanent_storage_mgr.c Tue Jun 17 07:54:04 2008
@@ -457,7 +457,9 @@
         axiom_soap_builder_free(soap_builder, env);
         return NULL;
     }
+
     axis2_msg_ctx_set_soap_envelope(msg_ctx, env, soap_envelope);
+
     axis2_msg_ctx_set_msg_id(msg_ctx, env, sandesha2_msg_store_bean_get_msg_id(
         msg_store_bean, env));
 
@@ -484,7 +486,7 @@
         svc = axis2_conf_get_svc(conf, env, svc_name_str);
         if(svc)
             axis2_msg_ctx_set_svc(msg_ctx, env, svc);
-        else
+        /*else
         {
             AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "Cannot build message "\
                  "context since service %s is not present", svc_name_str);
@@ -493,7 +495,7 @@
                 AXIS2_FAILURE);
             axiom_soap_builder_free(soap_builder, env);
             return NULL;
-        }
+        */
     }
 
     op_name_str = sandesha2_msg_store_bean_get_op(msg_store_bean, env);
@@ -587,11 +589,13 @@
         env);
     if(transport_to_str)
     {
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_transport_to_str:%s", transport_to_str);
         axis2_msg_ctx_set_transport_url(msg_ctx, env, transport_to_str);
     }
     to_url_str = sandesha2_msg_store_bean_get_to_url(msg_store_bean, env);
     if(to_url_str)
     {
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_to_url_str:%s", to_url_str);
         endpoint_ref = axis2_endpoint_ref_create(env, to_url_str);
         axis2_msg_ctx_set_to(msg_ctx, env, endpoint_ref);
     }
@@ -776,6 +780,7 @@
     if(to)
     {
         address = (axis2_char_t *) axis2_endpoint_ref_get_address(to, env);
+        AXIS2_LOG_ERROR(env->log, AXIS2_LOG_SI, "dam_address:%s", address);
         sandesha2_msg_store_bean_set_to_url(bean, env, address);
     }
     reply_to = axis2_msg_ctx_get_reply_to(msg_ctx, env);



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org