You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2018/12/10 19:48:04 UTC
[avro] branch master updated: change the error module to be
thread-safe on Windows as well as on UNIX (depended on defining the macro
THREADSAFE). add appropriate test which launch several threads and validate
the errors they set/add are thread-safe.
This is an automated email from the ASF dual-hosted git repository.
dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new e37a52d change the error module to be thread-safe on Windows as well as on UNIX (depended on defining the macro THREADSAFE). add appropriate test which launch several threads and validate the errors they set/add are thread-safe.
e37a52d is described below
commit e37a52d909b913059ba4493f509ede7d46dcf0e1
Author: Eliyahu-Machluf <el...@attunity.com>
AuthorDate: Sun Nov 27 12:09:02 2016 +0200
change the error module to be thread-safe on Windows as well as on UNIX (depended on defining the macro THREADSAFE).
add appropriate test which launch several threads and validate the errors they set/add are thread-safe.
---
lang/c/src/errors.c | 48 ++++--
lang/c/tests/test_avro_errors_are_thread_safe.c | 203 ++++++++++++++++++++++++
2 files changed, 238 insertions(+), 13 deletions(-)
diff --git a/lang/c/src/errors.c b/lang/c/src/errors.c
index a033be7..23934a6 100644
--- a/lang/c/src/errors.c
+++ b/lang/c/src/errors.c
@@ -22,17 +22,6 @@
#include "avro/errors.h"
-#if defined THREADSAFE && (defined __unix__ || defined __unix)
-#include <pthread.h>
-static pthread_key_t error_data_key;
-static pthread_once_t error_data_key_once = PTHREAD_ONCE_INIT;
-
-static void make_error_data_key()
-{
- pthread_key_create(&error_data_key, free);
-}
-#endif
-
/* 4K should be enough, right? */
#define AVRO_ERROR_SIZE 4096
@@ -52,10 +41,30 @@ struct avro_error_data_t {
};
+#if defined THREADSAFE
+#if ( defined __unix__ || defined __unix )
+#include <pthread.h>
+static pthread_key_t error_data_key;
+static pthread_once_t error_data_key_once = PTHREAD_ONCE_INIT;
+
+static void make_error_data_key()
+{
+ pthread_key_create(&error_data_key, free);
+}
+#elif defined _WIN32
+#include <Windows.h>
+
+static __declspec( thread ) struct avro_error_data_t TLS_ERROR_DATA = { "", "", NULL, NULL };
+
+#endif /* unix||_unix||_WIN32 */
+#endif /* THREADSAFE */
+
static struct avro_error_data_t *
avro_get_error_data(void)
{
-#if defined THREADSAFE && (defined __unix__ || defined __unix)
+#if defined THREADSAFE
+#if defined __unix__ || defined __unix
+
pthread_once(&error_data_key_once, make_error_data_key);
struct avro_error_data_t *ERROR_DATA =
@@ -72,7 +81,20 @@ avro_get_error_data(void)
}
return ERROR_DATA;
-#else
+
+#elif defined _WIN32
+
+ if ( TLS_ERROR_DATA.AVRO_CURRENT_ERROR == NULL )
+ {
+ //first usage of the ERROR_DATA, initialize 'current' and 'other' pointers.
+ TLS_ERROR_DATA.AVRO_CURRENT_ERROR = TLS_ERROR_DATA.AVRO_ERROR1;
+ TLS_ERROR_DATA.AVRO_OTHER_ERROR = TLS_ERROR_DATA.AVRO_ERROR2;
+ }
+ return &TLS_ERROR_DATA;
+
+ #endif /* UNIX and WIN32 threadsafe handling */
+
+#else /* not thread-safe */
static struct avro_error_data_t ERROR_DATA = {
/* .AVRO_ERROR1 = */ {'\0'},
/* .AVRO_ERROR2 = */ {'\0'},
diff --git a/lang/c/tests/test_avro_errors_are_thread_safe.c b/lang/c/tests/test_avro_errors_are_thread_safe.c
new file mode 100644
index 0000000..8e2f70d
--- /dev/null
+++ b/lang/c/tests/test_avro_errors_are_thread_safe.c
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <Windows.h>
+#define THR_HANDLE HANDLE
+#define LAST_ERROR() GetLastError()
+#define SLEEP_MILLIS(millis) Sleep( millis )
+#else
+#include <pthread.h>
+#include <errno.h>
+#include <unistd.h>
+#define THR_HANDLE pthread_t
+#define LAST_ERROR() errno
+#define SLEEP_MILLIS(millis) usleep( millis * 1000 )
+#endif
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <avro.h>
+
+enum {
+ NUMBER_OF_TEST_THREADS = 64,
+ THREAD_ERROR_BUFFER_SIZE = 1024,
+ MAX_THREAD_SLEEP_MILLIS = 3000,
+};
+
+typedef struct _TEST_THREAD_DATA
+{
+ int index;
+ int error_occured;
+ char error_message[THREAD_ERROR_BUFFER_SIZE];
+ volatile int worker_thread;
+ int sleep_interval_millis;
+}TEST_THREAD_DATA;
+
+static int get_random_value( int max_value );
+#ifdef _WIN32
+static DWORD worker_thread( LPVOID lpThreadParameter );
+#else
+static void *worker_thread( void *p );
+#endif
+static THR_HANDLE launch_thread( void *thread_context );
+static int join_thread( THR_HANDLE thread_handle );
+
+int main()
+{
+ TEST_THREAD_DATA threads_data[NUMBER_OF_TEST_THREADS];
+ THR_HANDLE thread_handle[NUMBER_OF_TEST_THREADS];
+ unsigned i;
+ int found_error = 0;
+
+ srand( (unsigned)time( NULL ) );
+
+ memset( threads_data, 0, sizeof(threads_data) );
+ for ( i = 0; i < NUMBER_OF_TEST_THREADS; i++ )
+ {
+ threads_data[i].index = i;
+ threads_data[i].sleep_interval_millis = get_random_value( MAX_THREAD_SLEEP_MILLIS );
+ thread_handle[i] = launch_thread( &threads_data[i] );
+ if ( !thread_handle[i] )
+ {
+ fprintf( stderr, "failed to launch worker thread, error code is %d\n", LAST_ERROR() );
+ return EXIT_FAILURE;
+ }
+ }
+
+ for ( i = 0; i < NUMBER_OF_TEST_THREADS; i++ )
+ if ( join_thread( thread_handle[i] ) != 0 )
+ {
+ fprintf( stderr, "failed to join thread %d, error code is %d\n", i, LAST_ERROR() );
+ return EXIT_FAILURE;
+ }
+
+ for ( i = 0; i < NUMBER_OF_TEST_THREADS; i++ )
+ {
+ if( threads_data[i].error_occured )
+ {
+ fprintf( stderr, "error occured at thread %d: %s\n", i, threads_data[i].error_message );
+ found_error = 1;
+ }
+ }
+ if ( found_error )
+ return EXIT_FAILURE;
+
+// printf( "test ended successfully\n");
+ return EXIT_SUCCESS;
+}
+
+#ifdef _WIN32
+static DWORD worker_thread( LPVOID context )
+#else
+static void *worker_thread( void *context )
+#endif
+{
+ /*
+ worker thread set an error, request the error stack and validate it contains the error saved.
+ later it appends another error to the error stack, and validate it contains the two errors.
+ */
+ TEST_THREAD_DATA *thread_context = (TEST_THREAD_DATA *)context;
+ char first_error_buffer[1024] = "";
+ char second_error_buffer[1024] = "";
+ char full_error_buffer[1024] = "";
+ const char *error_stack = NULL;
+ int index = thread_context->index;
+ unsigned sleep_interval_millis = thread_context->sleep_interval_millis;
+
+ //set a thread specific error
+ snprintf( first_error_buffer, sizeof(first_error_buffer), "thread %d set an error", index );
+ avro_set_error( "%s", first_error_buffer );
+
+ SLEEP_MILLIS( sleep_interval_millis );
+
+ //validate error stack contains the thread specific error
+ error_stack = avro_strerror();
+ if ( strcmp( error_stack, first_error_buffer ) != 0 )
+ {
+ thread_context->error_occured = 1;
+ snprintf( thread_context->error_message,
+ sizeof(thread_context->error_message),
+ "invalid error stack found: expected '%s' found '%s'", first_error_buffer, error_stack );
+ }
+
+ //set another thread specific error
+ SLEEP_MILLIS( sleep_interval_millis );
+ snprintf( second_error_buffer, sizeof(second_error_buffer), "thread %d set ANOTHER error...", index );
+ avro_prefix_error( "%s", second_error_buffer );
+ snprintf( full_error_buffer, sizeof(full_error_buffer), "%s%s", second_error_buffer, first_error_buffer );
+
+ //validate error stack contains the 2 errors as expected
+ SLEEP_MILLIS( sleep_interval_millis );
+ error_stack = avro_strerror();
+ if ( strcmp( error_stack, full_error_buffer ) != 0 )
+ {
+ thread_context->error_occured = 1;
+ snprintf( thread_context->error_message,
+ sizeof(thread_context->error_message),
+ "invalid error stack found: expected '%s' found '%s'", full_error_buffer, error_stack );
+ }
+
+ return 0;
+
+}
+
+static THR_HANDLE launch_thread( void *thread_context )
+{
+#ifdef _WIN32
+ static const LPSECURITY_ATTRIBUTES DEFAULT_SECURITY_ATTIRBUTES = NULL;
+ static const SIZE_T DEFAULT_STACK_SIZE = 0;
+ static const DWORD DEFAULT_CREATION_FLAGS = 0;
+ DWORD thread_id = 0;
+
+ return
+ CreateThread( DEFAULT_SECURITY_ATTIRBUTES,
+ DEFAULT_STACK_SIZE,
+ worker_thread,
+ thread_context,
+ DEFAULT_CREATION_FLAGS,
+ &thread_id );
+#else
+ pthread_attr_t attr = {0};
+ pthread_t thread;
+ pthread_attr_init( &attr );
+ int status = 0;
+ status = pthread_create( &thread, &attr, worker_thread, thread_context );
+ pthread_attr_destroy(&attr);
+ if ( status != 0 )
+ return NULL;
+ return thread;
+#endif
+}
+
+static int join_thread( THR_HANDLE thread_handle )
+{
+#ifdef _WIN32
+ return
+ ( WaitForSingleObject( thread_handle, INFINITE ) == WAIT_OBJECT_0 ) ? 0 : -1;
+#else
+ return
+ pthread_join( thread_handle, NULL );
+#endif
+}
+
+static int get_random_value( int max_value )
+{
+ return
+ rand() % max_value;
+}