You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2018/12/10 19:48:04 UTC

[avro] branch master updated: change the error module to be thread-safe on Windows as well as on UNIX (depended on defining the macro THREADSAFE). add appropriate test which launch several threads and validate the errors they set/add are thread-safe.

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new e37a52d  change the error module to be thread-safe on Windows as well as on UNIX (depended on defining the macro THREADSAFE). add appropriate test which launch several threads and validate the errors they set/add are thread-safe.
e37a52d is described below

commit e37a52d909b913059ba4493f509ede7d46dcf0e1
Author: Eliyahu-Machluf <el...@attunity.com>
AuthorDate: Sun Nov 27 12:09:02 2016 +0200

    change the error module to be thread-safe on Windows as well as on UNIX (depended on defining the macro THREADSAFE).
    add appropriate test which launch several threads and validate the errors they set/add are thread-safe.
---
 lang/c/src/errors.c                             |  48 ++++--
 lang/c/tests/test_avro_errors_are_thread_safe.c | 203 ++++++++++++++++++++++++
 2 files changed, 238 insertions(+), 13 deletions(-)

diff --git a/lang/c/src/errors.c b/lang/c/src/errors.c
index a033be7..23934a6 100644
--- a/lang/c/src/errors.c
+++ b/lang/c/src/errors.c
@@ -22,17 +22,6 @@
 
 #include "avro/errors.h"
 
-#if defined THREADSAFE && (defined __unix__ || defined __unix)
-#include <pthread.h>
-static pthread_key_t error_data_key;
-static pthread_once_t error_data_key_once = PTHREAD_ONCE_INIT;
-
-static void make_error_data_key()
-{
-    pthread_key_create(&error_data_key, free);
-}
-#endif
-
 /* 4K should be enough, right? */
 #define AVRO_ERROR_SIZE 4096
 
@@ -52,10 +41,30 @@ struct avro_error_data_t {
 };
 
 
+#if defined THREADSAFE 
+#if ( defined __unix__ || defined __unix )
+#include <pthread.h>
+static pthread_key_t error_data_key;
+static pthread_once_t error_data_key_once = PTHREAD_ONCE_INIT;
+
+static void make_error_data_key()
+{
+    pthread_key_create(&error_data_key, free);
+}
+#elif defined _WIN32
+#include <Windows.h>
+
+static __declspec( thread ) struct avro_error_data_t TLS_ERROR_DATA = { "", "", NULL, NULL };
+
+#endif /* unix||_unix||_WIN32 */
+#endif /* THREADSAFE */
+
 static struct avro_error_data_t *
 avro_get_error_data(void)
 {
-#if defined THREADSAFE && (defined __unix__ || defined __unix)
+#if defined THREADSAFE  
+#if defined __unix__ || defined __unix
+
     pthread_once(&error_data_key_once, make_error_data_key);
 
     struct avro_error_data_t *ERROR_DATA =
@@ -72,7 +81,20 @@ avro_get_error_data(void)
     }
 
     return ERROR_DATA;
-#else
+
+#elif defined _WIN32
+	
+	if ( TLS_ERROR_DATA.AVRO_CURRENT_ERROR == NULL )
+	{
+		//first usage of the ERROR_DATA, initialize 'current' and 'other' pointers.
+		TLS_ERROR_DATA.AVRO_CURRENT_ERROR = TLS_ERROR_DATA.AVRO_ERROR1;
+		TLS_ERROR_DATA.AVRO_OTHER_ERROR = TLS_ERROR_DATA.AVRO_ERROR2;
+	}
+	return &TLS_ERROR_DATA;
+
+	#endif /* UNIX and WIN32 threadsafe handling */
+
+#else /* not thread-safe */  
     static struct avro_error_data_t ERROR_DATA = {
       /* .AVRO_ERROR1 = */ {'\0'},
       /* .AVRO_ERROR2 = */ {'\0'},
diff --git a/lang/c/tests/test_avro_errors_are_thread_safe.c b/lang/c/tests/test_avro_errors_are_thread_safe.c
new file mode 100644
index 0000000..8e2f70d
--- /dev/null
+++ b/lang/c/tests/test_avro_errors_are_thread_safe.c
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <Windows.h>
+#define THR_HANDLE HANDLE
+#define LAST_ERROR() GetLastError()
+#define SLEEP_MILLIS(millis) Sleep( millis )
+#else
+#include <pthread.h>
+#include <errno.h>
+#include <unistd.h>
+#define THR_HANDLE pthread_t
+#define LAST_ERROR() errno
+#define SLEEP_MILLIS(millis) usleep( millis * 1000 )
+#endif
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <time.h>
+#include <avro.h>
+
+enum { 
+	NUMBER_OF_TEST_THREADS = 64,
+	THREAD_ERROR_BUFFER_SIZE = 1024,
+	MAX_THREAD_SLEEP_MILLIS = 3000,
+};
+
+typedef struct _TEST_THREAD_DATA
+{
+	int index;
+	int error_occured;
+	char error_message[THREAD_ERROR_BUFFER_SIZE];
+	volatile int worker_thread;
+	int sleep_interval_millis;
+}TEST_THREAD_DATA;
+
+static int get_random_value( int max_value );
+#ifdef _WIN32
+static DWORD worker_thread( LPVOID lpThreadParameter );
+#else
+static void *worker_thread( void *p );
+#endif
+static THR_HANDLE launch_thread( void *thread_context );
+static int join_thread( THR_HANDLE thread_handle );
+
+int main()
+{
+	TEST_THREAD_DATA threads_data[NUMBER_OF_TEST_THREADS];
+	THR_HANDLE thread_handle[NUMBER_OF_TEST_THREADS];
+	unsigned i;
+	int found_error = 0;
+
+	srand( (unsigned)time( NULL ) );  
+
+	memset( threads_data, 0, sizeof(threads_data) );
+	for ( i = 0; i < NUMBER_OF_TEST_THREADS; i++ )
+	{
+		threads_data[i].index = i;
+		threads_data[i].sleep_interval_millis = get_random_value( MAX_THREAD_SLEEP_MILLIS );
+		thread_handle[i] = launch_thread( &threads_data[i] );
+		if ( !thread_handle[i] )
+		{
+			fprintf( stderr, "failed to launch worker thread, error code is %d\n", LAST_ERROR() );
+			return EXIT_FAILURE;
+		}
+	}
+
+	for ( i = 0; i < NUMBER_OF_TEST_THREADS; i++ )
+		if ( join_thread( thread_handle[i] ) != 0 )
+		{
+			fprintf( stderr, "failed to join thread %d, error code is %d\n", i, LAST_ERROR() );
+			return EXIT_FAILURE;
+		}
+
+	for ( i = 0; i < NUMBER_OF_TEST_THREADS; i++ )
+	{
+		if( threads_data[i].error_occured )
+		{
+			fprintf( stderr, "error occured at thread %d: %s\n", i, threads_data[i].error_message );
+			found_error = 1;
+		}
+	}
+	if ( found_error )
+		return EXIT_FAILURE;
+
+//	printf( "test ended successfully\n");
+	return EXIT_SUCCESS;
+}
+
+#ifdef _WIN32
+static DWORD worker_thread( LPVOID context )
+#else
+static void *worker_thread( void *context )
+#endif
+{
+	/* 
+	worker thread set an error, request the error stack and validate it contains the error saved.
+	later it appends another error to the error stack, and validate it contains the two errors.
+	*/
+	TEST_THREAD_DATA *thread_context = (TEST_THREAD_DATA *)context;
+	char first_error_buffer[1024] = "";
+	char second_error_buffer[1024] = "";
+	char full_error_buffer[1024] = "";
+	const char *error_stack = NULL;
+	int index = thread_context->index;
+	unsigned sleep_interval_millis = thread_context->sleep_interval_millis;
+
+	//set a thread specific error
+	snprintf( first_error_buffer, sizeof(first_error_buffer), "thread %d set an error", index );
+	avro_set_error( "%s", first_error_buffer );
+
+	SLEEP_MILLIS( sleep_interval_millis );
+
+	//validate error stack contains the thread specific error
+	error_stack = avro_strerror();
+	if ( strcmp( error_stack, first_error_buffer ) != 0 )
+	{
+		thread_context->error_occured = 1;
+		snprintf( thread_context->error_message, 
+				  sizeof(thread_context->error_message),
+				  "invalid error stack found: expected '%s' found '%s'", first_error_buffer, error_stack );
+	}
+
+	//set another thread specific error
+	SLEEP_MILLIS( sleep_interval_millis );
+	snprintf( second_error_buffer, sizeof(second_error_buffer), "thread %d set ANOTHER error...", index );
+	avro_prefix_error( "%s", second_error_buffer );
+	snprintf( full_error_buffer, sizeof(full_error_buffer), "%s%s", second_error_buffer, first_error_buffer );
+
+	//validate error stack contains the 2 errors as expected
+	SLEEP_MILLIS( sleep_interval_millis );
+	error_stack = avro_strerror();
+	if ( strcmp( error_stack, full_error_buffer ) != 0 )
+	{
+		thread_context->error_occured = 1;
+		snprintf( thread_context->error_message, 
+				  sizeof(thread_context->error_message),
+				  "invalid error stack found: expected '%s' found '%s'", full_error_buffer, error_stack );
+	}
+
+	return 0;
+
+}
+
+static THR_HANDLE launch_thread( void *thread_context )
+{
+#ifdef _WIN32
+	static const LPSECURITY_ATTRIBUTES DEFAULT_SECURITY_ATTIRBUTES = NULL;
+	static const SIZE_T DEFAULT_STACK_SIZE = 0;
+	static const DWORD DEFAULT_CREATION_FLAGS = 0;
+	DWORD thread_id = 0;
+
+	return
+		CreateThread( DEFAULT_SECURITY_ATTIRBUTES,
+					  DEFAULT_STACK_SIZE,
+					  worker_thread,
+					  thread_context,
+					  DEFAULT_CREATION_FLAGS,
+					  &thread_id );
+#else
+    pthread_attr_t attr = {0};
+	pthread_t thread;
+    pthread_attr_init( &attr );
+	int status = 0;
+    status = pthread_create( &thread, &attr, worker_thread, thread_context );
+    pthread_attr_destroy(&attr);
+	if ( status != 0 )
+		return NULL;
+	return thread;
+#endif
+}
+
+static int join_thread( THR_HANDLE thread_handle )
+{
+#ifdef _WIN32
+	return
+		( WaitForSingleObject( thread_handle, INFINITE ) == WAIT_OBJECT_0 ) ? 0 : -1;
+#else
+	return
+		pthread_join( thread_handle, NULL );
+#endif
+}
+
+static int get_random_value( int max_value )
+{
+	return 
+		rand() % max_value;
+}