You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ma...@apache.org on 2010/01/28 03:06:35 UTC

svn commit: r903941 - in /hadoop/avro/trunk: ./ lang/c/ lang/c/docs/ lang/c/examples/ lang/c/src/ lang/c/tests/

Author: massie
Date: Thu Jan 28 02:06:31 2010
New Revision: 903941

URL: http://svn.apache.org/viewvc?rev=903941&view=rev
Log:
AVRO-384. Add schema projection to the C implementation

Added:
    hadoop/avro/trunk/lang/c/src/datum_skip.c
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/lang/c/docs/index.txt
    hadoop/avro/trunk/lang/c/examples/quickstop.c
    hadoop/avro/trunk/lang/c/src/Makefile.am
    hadoop/avro/trunk/lang/c/src/avro.h
    hadoop/avro/trunk/lang/c/src/datum_read.c
    hadoop/avro/trunk/lang/c/src/io.c
    hadoop/avro/trunk/lang/c/tests/Makefile.am
    hadoop/avro/trunk/lang/c/tests/test_valgrind
    hadoop/avro/trunk/lang/c/version.sh

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Jan 28 02:06:31 2010
@@ -265,6 +265,8 @@
     AVRO-381. Update documentation to talk about reference counting and 
               memory management (massie)
 
+    AVRO-384. Add schema projection to the C implementation (massie)
+
   OPTIMIZATIONS
 
     AVRO-172. More efficient schema processing (massie)

Modified: hadoop/avro/trunk/lang/c/docs/index.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/docs/index.txt?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/docs/index.txt (original)
+++ hadoop/avro/trunk/lang/c/docs/index.txt Thu Jan 28 02:06:31 2010
@@ -154,16 +154,25 @@
 | 32 34 .. .. .. .. .. .. | .. .. .. .. .. .. .. .. |	24..............
 
 Now let's read all the records back out
-1 |           Dante |           Hicks |            (555) 123-4567 |   32
-2 |          Randal |          Graves |            (555) 123-5678 |   30
-3 |        Veronica |        Loughran |            (555) 123-0987 |   28
-4 |         Caitlin |            Bree |            (555) 123-2323 |   27
-5 |             Bob |          Silent |            (555) 123-6422 |   29
-6 |             Jay |             ??? |            (555) 123-9182 |   26
+1 |           Dante |           Hicks |  (555) 123-4567 | 32
+2 |          Randal |          Graves |  (555) 123-5678 | 30
+3 |        Veronica |        Loughran |  (555) 123-0987 | 28
+4 |         Caitlin |            Bree |  (555) 123-2323 | 27
+5 |             Bob |          Silent |  (555) 123-6422 | 29
+6 |             Jay |             ??? |  (555) 123-9182 | 26
+
+
+Use projection to print only the First name and phone numbers
+          Dante |  (555) 123-4567 | 
+         Randal |  (555) 123-5678 | 
+       Veronica |  (555) 123-0987 | 
+        Caitlin |  (555) 123-2323 | 
+            Bob |  (555) 123-6422 | 
+            Jay |  (555) 123-9182 | 
 ----
 
-The *Quick Stop* store owner was so pleased, he asked you to create a 
-movie database for his *RST Video* store.  
+The *Quick Stop* owner was so pleased, he asked you to create a 
+movie database for his *RST Video* store.
 
 == Reference files
 

Modified: hadoop/avro/trunk/lang/c/examples/quickstop.c
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/examples/quickstop.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/examples/quickstop.c (original)
+++ hadoop/avro/trunk/lang/c/examples/quickstop.c Thu Jan 28 02:06:31 2010
@@ -20,8 +20,6 @@
 
 char buf[4096];
 avro_schema_t person_schema;
-avro_reader_t reader;
-avro_writer_t writer;
 int64_t id = 0;
 
 /* A simple schema for our tutorial */
@@ -48,7 +46,8 @@
 
 /* Create a datum to match the person schema and save it */
 void
-add_person(const char *first, const char *last, const char *phone, int32_t age)
+add_person(avro_writer_t writer, const char *first, const char *last,
+	   const char *phone, int32_t age)
 {
 	avro_datum_t person = avro_record("Person");
 
@@ -84,12 +83,12 @@
 	fprintf(stdout, "Successfully added %s, %s id=%ld\n", last, first, id);
 }
 
-int print_person(void)
+int print_person(avro_reader_t reader, avro_schema_t reader_schema)
 {
 	int rval;
 	avro_datum_t person;
 
-	rval = avro_read_data(reader, person_schema, NULL, &person);
+	rval = avro_read_data(reader, person_schema, reader_schema, &person);
 	if (rval == 0) {
 		int64_t i64;
 		int32_t i32;
@@ -97,22 +96,27 @@
 		avro_datum_t id_datum, first_datum, last_datum, phone_datum,
 		    age_datum;
 
-		avro_record_get(person, "ID", &id_datum);
-		avro_record_get(person, "First", &first_datum);
-		avro_record_get(person, "Last", &last_datum);
-		avro_record_get(person, "Phone", &phone_datum);
-		avro_record_get(person, "Age", &age_datum);
-
-		avro_int64_get(id_datum, &i64);
-		fprintf(stdout, "%ld | ", i64);
-		avro_string_get(first_datum, &p);
-		fprintf(stdout, "%15s | ", p);
-		avro_string_get(last_datum, &p);
-		fprintf(stdout, "%15s | ", p);
-		avro_string_get(phone_datum, &p);
-		fprintf(stdout, "%25s | ", p);
-		avro_int32_get(age_datum, &i32);
-		fprintf(stdout, "  %2d\n", i32);
+		if (avro_record_get(person, "ID", &id_datum) == 0) {
+			avro_int64_get(id_datum, &i64);
+			fprintf(stdout, "%ld | ", i64);
+		}
+		if (avro_record_get(person, "First", &first_datum) == 0) {
+			avro_string_get(first_datum, &p);
+			fprintf(stdout, "%15s | ", p);
+		}
+		if (avro_record_get(person, "Last", &last_datum) == 0) {
+			avro_string_get(last_datum, &p);
+			fprintf(stdout, "%15s | ", p);
+		}
+		if (avro_record_get(person, "Phone", &phone_datum) == 0) {
+			avro_string_get(phone_datum, &p);
+			fprintf(stdout, "%15s | ", p);
+		}
+		if (avro_record_get(person, "Age", &age_datum) == 0) {
+			avro_int32_get(age_datum, &i32);
+			fprintf(stdout, "%ld", i32);
+		}
+		fprintf(stdout, "\n");
 
 		/* We no longer need this memory */
 		avro_datum_decref(person);
@@ -122,22 +126,23 @@
 
 int main(void)
 {
+	avro_reader_t reader;
+	avro_writer_t writer;
+	avro_schema_t projection_schema, first_name_schema, phone_schema;
 	int64_t i;
 
-	/* Create readers and writers backed by memory */
-	writer = avro_writer_memory(buf, sizeof(buf));
-	reader = avro_reader_memory(buf, sizeof(buf));
-
 	/* Initialize the schema structure from JSON */
 	init_schema();
 
 	/* Add people to the database */
-	add_person("Dante", "Hicks", "(555) 123-4567", 32);
-	add_person("Randal", "Graves", "(555) 123-5678", 30);
-	add_person("Veronica", "Loughran", "(555) 123-0987", 28);
-	add_person("Caitlin", "Bree", "(555) 123-2323", 27);
-	add_person("Bob", "Silent", "(555) 123-6422", 29);
-	add_person("Jay", "???", "(555) 123-9182", 26);
+	writer = avro_writer_memory(buf, sizeof(buf));
+	add_person(writer, "Dante", "Hicks", "(555) 123-4567", 32);
+	add_person(writer, "Randal", "Graves", "(555) 123-5678", 30);
+	add_person(writer, "Veronica", "Loughran", "(555) 123-0987", 28);
+	add_person(writer, "Caitlin", "Bree", "(555) 123-2323", 27);
+	add_person(writer, "Bob", "Silent", "(555) 123-6422", 29);
+	add_person(writer, "Jay", "???", "(555) 123-9182", 26);
+	avro_writer_free(writer);
 
 	fprintf(stdout,
 		"\nAvro is compact. Here is the data for all %ld people.\n",
@@ -147,18 +152,43 @@
 	fprintf(stdout, "\nNow let's read all the records back out\n");
 
 	/* Read all the records and print them */
+	reader = avro_reader_memory(buf, sizeof(buf));
 	for (i = 0; i < id; i++) {
-		if (print_person()) {
+		if (print_person(reader, NULL)) {
 			fprintf(stderr, "Error printing person\n");
 			exit(EXIT_FAILURE);
 		}
 	}
+	avro_reader_free(reader);
 
-	/* We don't need this schema anymore */
-	avro_schema_decref(person_schema);
+	/* You can also use projection, to only decode only the data you are
+	   interested in.  This is particularly useful when you have 
+	   huge data sets and you'll only interest in particular fields
+	   e.g. your contacts First name and phone number */
+	projection_schema = avro_schema_record("Person");
+	first_name_schema = avro_schema_string();
+	phone_schema = avro_schema_string();
+	avro_schema_record_field_append(projection_schema, "First",
+					first_name_schema);
+	avro_schema_record_field_append(projection_schema, "Phone",
+					phone_schema);
 
-	/* We dont' need the reader/writer anymore */
+	/* Read only the record you're interested in */
+	fprintf(stdout,
+		"\n\nUse projection to print only the First name and phone numbers\n");
+	reader = avro_reader_memory(buf, sizeof(buf));
+	for (i = 0; i < id; i++) {
+		if (print_person(reader, projection_schema)) {
+			fprintf(stderr, "Error printing person\n");
+			exit(EXIT_FAILURE);
+		}
+	}
 	avro_reader_free(reader);
-	avro_writer_free(writer);
+	avro_schema_decref(first_name_schema);
+	avro_schema_decref(phone_schema);
+	avro_schema_decref(projection_schema);
+
+	/* We don't need this schema anymore */
+	avro_schema_decref(person_schema);
 	return 0;
 }

Modified: hadoop/avro/trunk/lang/c/src/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/Makefile.am?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/Makefile.am (original)
+++ hadoop/avro/trunk/lang/c/src/Makefile.am Thu Jan 28 02:06:31 2010
@@ -7,7 +7,7 @@
 
 lib_LTLIBRARIES = libavro.la
 libavro_la_SOURCES = st.c st.h schema.c schema.h schema_printf.c schema_equal.c \
-datum.c datum_equal.c datum_validate.c datum_read.c datum_write.c datum.h \
+datum.c datum_equal.c datum_validate.c datum_read.c datum_skip.c datum_write.c datum.h \
 io.c dump.c dump.h encoding_binary.c \
 container_of.h queue.h encoding.h
 libavro_la_LIBADD = $(top_builddir)/jansson/src/.libs/libjansson.a

Modified: hadoop/avro/trunk/lang/c/src/avro.h
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/avro.h?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/avro.h (original)
+++ hadoop/avro/trunk/lang/c/src/avro.h Thu Jan 28 02:06:31 2010
@@ -255,6 +255,7 @@
 int avro_read_data(avro_reader_t reader,
 		   avro_schema_t writer_schema,
 		   avro_schema_t reader_schema, avro_datum_t * datum);
+int avro_skip_data(avro_reader_t reader, avro_schema_t writer_schema);
 int avro_write_data(avro_writer_t writer,
 		    avro_schema_t writer_schema, avro_datum_t datum);
 

Modified: hadoop/avro/trunk/lang/c/src/datum_read.c
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/datum_read.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/datum_read.c (original)
+++ hadoop/avro/trunk/lang/c/src/datum_read.c Thu Jan 28 02:06:31 2010
@@ -285,8 +285,10 @@
 			}
 			avro_datum_decref(field_datum);
 		} else {
-			/* TODO: skip_record */
-			return -1;
+			rval = avro_skip_data(reader, field->type);
+			if (rval) {
+				return rval;
+			}
 		}
 	}
 	return 0;

Added: hadoop/avro/trunk/lang/c/src/datum_skip.c
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/datum_skip.c?rev=903941&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c/src/datum_skip.c (added)
+++ hadoop/avro/trunk/lang/c/src/datum_skip.c Thu Jan 28 02:06:31 2010
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License. 
+ */
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include "encoding.h"
+#include "schema.h"
+
+static int skip_array(avro_reader_t reader, const avro_encoding_t * enc,
+		      struct avro_array_schema_t *writers_schema)
+{
+	int rval;
+	int64_t i;
+	int64_t block_count;
+	int64_t block_size;
+
+	rval = enc->read_long(reader, &block_count);
+	if (rval) {
+		return rval;
+	}
+
+	while (block_count != 0) {
+		if (block_count < 0) {
+			block_count = block_count * -1;
+			rval = enc->read_long(reader, &block_size);
+			if (rval) {
+				return rval;
+			}
+		}
+
+		for (i = 0; i < block_count; i++) {
+			rval = avro_skip_data(reader, writers_schema->items);
+			if (rval) {
+				return rval;
+			}
+		}
+
+		rval = enc->read_long(reader, &block_count);
+		if (rval) {
+			return rval;
+		}
+	}
+	return 0;
+}
+
+static int skip_map(avro_reader_t reader, const avro_encoding_t * enc,
+		    struct avro_map_schema_t *writers_schema)
+{
+	int rval;
+	int64_t i, block_count;
+
+	rval = enc->read_long(reader, &block_count);
+	if (rval) {
+		return rval;
+	}
+	while (block_count != 0) {
+		int64_t block_size;
+		if (block_count < 0) {
+			block_count = block_count * -1;
+			rval = enc->read_long(reader, &block_size);
+			if (rval) {
+				return rval;
+			}
+		}
+		for (i = 0; i < block_count; i++) {
+			rval = enc->skip_string(reader);
+			if (rval) {
+				return rval;
+			}
+			rval =
+			    avro_skip_data(reader,
+					   avro_schema_to_map(writers_schema)->
+					   values);
+			if (rval) {
+				return rval;
+			}
+		}
+		rval = enc->read_long(reader, &block_count);
+		if (rval) {
+			return rval;
+		}
+	}
+	return 0;
+}
+
+static int skip_union(avro_reader_t reader, const avro_encoding_t * enc,
+		      struct avro_union_schema_t *writers_schema)
+{
+	int rval;
+	int64_t i, index;
+	struct avro_union_branch_t *branch;
+
+	rval = enc->read_long(reader, &index);
+	if (rval) {
+		return rval;
+	}
+
+	branch = STAILQ_FIRST(&writers_schema->branches);
+	for (i = 0; i != index && branch != NULL;
+	     branch = STAILQ_NEXT(branch, branches)) {
+	}
+	if (!branch) {
+		return EILSEQ;
+	}
+	return avro_skip_data(reader, branch->schema);
+}
+
+static int skip_record(avro_reader_t reader, const avro_encoding_t * enc,
+		       struct avro_record_schema_t *writers_schema)
+{
+	int rval;
+	struct avro_record_field_t *field;
+
+	for (field = STAILQ_FIRST(&writers_schema->fields);
+	     field != NULL; field = STAILQ_NEXT(field, fields)) {
+		rval = avro_skip_data(reader, field->type);
+		if (rval) {
+			return rval;
+		}
+	}
+	return 0;
+}
+
+int avro_skip_data(avro_reader_t reader, avro_schema_t writers_schema)
+{
+	int rval = EINVAL;
+	const avro_encoding_t *enc = &avro_binary_encoding;
+
+	if (!reader || !is_avro_schema(writers_schema)) {
+		return EINVAL;
+	}
+
+	switch (avro_typeof(writers_schema)) {
+	case AVRO_NULL:
+		rval = enc->skip_null(reader);
+		break;
+
+	case AVRO_BOOLEAN:
+		rval = enc->skip_boolean(reader);
+		break;
+
+	case AVRO_STRING:
+		rval = enc->skip_string(reader);
+		break;
+
+	case AVRO_INT32:
+		rval = enc->skip_int(reader);
+		break;
+
+	case AVRO_INT64:
+		rval = enc->skip_long(reader);
+		break;
+
+	case AVRO_FLOAT:
+		rval = enc->skip_float(reader);
+		break;
+
+	case AVRO_DOUBLE:
+		rval = enc->skip_double(reader);
+		break;
+
+	case AVRO_BYTES:
+		rval = enc->skip_bytes(reader);
+		break;
+
+	case AVRO_FIXED:
+		rval =
+		    avro_skip(reader,
+			      avro_schema_to_fixed(writers_schema)->size);
+		break;
+
+	case AVRO_ENUM:
+		rval = enc->skip_long(reader);
+		break;
+
+	case AVRO_ARRAY:
+		rval =
+		    skip_array(reader, enc,
+			       avro_schema_to_array(writers_schema));
+		break;
+
+	case AVRO_MAP:
+		rval =
+		    skip_map(reader, enc, avro_schema_to_map(writers_schema));
+		break;
+
+	case AVRO_UNION:
+		rval =
+		    skip_union(reader, enc,
+			       avro_schema_to_union(writers_schema));
+		break;
+
+	case AVRO_RECORD:
+		rval =
+		    skip_record(reader, enc,
+				avro_schema_to_record(writers_schema));
+		break;
+
+	case AVRO_LINK:
+		rval =
+		    avro_skip_data(reader,
+				   (avro_schema_to_link(writers_schema))->to);
+		break;
+	}
+
+	return rval;
+}

Modified: hadoop/avro/trunk/lang/c/src/io.c
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/io.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/io.c (original)
+++ hadoop/avro/trunk/lang/c/src/io.c Thu Jan 28 02:06:31 2010
@@ -139,7 +139,7 @@
 static int
 avro_read_memory(struct avro_memory_reader_t *reader, void *buf, int64_t len)
 {
-	if (len) {
+	if (len > 0) {
 		if ((reader->len - reader->read) < len) {
 			return ENOSPC;
 		}
@@ -174,6 +174,42 @@
 	return EINVAL;
 }
 
+static int avro_skip_memory(struct avro_memory_reader_t *reader, int64_t len)
+{
+	if (len > 0) {
+		if ((reader->len - reader->read) < len) {
+			return ENOSPC;
+		}
+		reader->read += len;
+	}
+	return 0;
+}
+
+static int avro_skip_file(struct avro_file_reader_t *reader, int64_t len)
+{
+	int rval;
+	if (len > 0) {
+		rval = fseek(reader->fp, len, SEEK_CUR);
+		if (rval < 0) {
+			return rval;
+		}
+	}
+	return 0;
+}
+
+int avro_skip(avro_reader_t reader, int64_t len)
+{
+	if (len >= 0) {
+		if (is_memory_io(reader)) {
+			return avro_skip_memory(avro_reader_to_memory(reader),
+						len);
+		} else if (is_file_io(reader)) {
+			return avro_skip_file(avro_reader_to_file(reader), len);
+		}
+	}
+	return 0;
+}
+
 static int
 avro_write_memory(struct avro_memory_writer_t *writer, void *buf, int64_t len)
 {
@@ -206,7 +242,7 @@
 		if (is_memory_io(writer)) {
 			return avro_write_memory(avro_writer_to_memory(writer),
 						 buf, len);
-		} else if (is_memory_io(writer)) {
+		} else if (is_file_io(writer)) {
 			return avro_write_file(avro_writer_to_file(writer), buf,
 					       len);
 		}
@@ -214,14 +250,6 @@
 	return EINVAL;
 }
 
-int avro_skip(avro_reader_t reader, int64_t len)
-{
-	/*
-	 * TODO 
-	 */
-	return -1;
-}
-
 void avro_writer_dump(avro_writer_t writer, FILE * fp)
 {
 	if (is_memory_io(writer)) {

Modified: hadoop/avro/trunk/lang/c/tests/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/tests/Makefile.am?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/tests/Makefile.am (original)
+++ hadoop/avro/trunk/lang/c/tests/Makefile.am Thu Jan 28 02:06:31 2010
@@ -15,7 +15,4 @@
 test_avro_data_SOURCES=test_avro_data.c
 test_avro_data_LDADD=$(test_LDADD)
 
-test_avro_interop_SOURCES=test_avro_interop.c
-test_avro_interop_LDADD=$(test_LDADD)
-
 TESTS=$(check_PROGRAMS) test_valgrind

Modified: hadoop/avro/trunk/lang/c/tests/test_valgrind
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/tests/test_valgrind?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/tests/test_valgrind (original)
+++ hadoop/avro/trunk/lang/c/tests/test_valgrind Thu Jan 28 02:06:31 2010
@@ -22,8 +22,7 @@
 	exit 77
 fi
 
-LD_LIBRARY_PATH="../src/.libs/:${LD_LIBRARY_PATH}"
-valgrind --leak-check=full --show-reachable=yes -q .libs/test_avro_data 2>&1 |\
+LD_LIBRARY_PATH="../src/.libs/" valgrind --leak-check=full --show-reachable=yes -q .libs/test_avro_data 2>&1 |\
 grep -E '^==[0-9]+== '
 if [ $? -eq 0 ]; then
 	# Expression found. Test failed.

Modified: hadoop/avro/trunk/lang/c/version.sh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/version.sh?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/version.sh (original)
+++ hadoop/avro/trunk/lang/c/version.sh Thu Jan 28 02:06:31 2010
@@ -18,7 +18,7 @@
 #         libavro_binary_age = 0
 #         libavro_interface_age = 0
 #
-libavro_micro_version=14
+libavro_micro_version=15
 libavro_interface_age=0
 libavro_binary_age=0