You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ma...@apache.org on 2010/01/28 03:06:35 UTC
svn commit: r903941 - in /hadoop/avro/trunk: ./ lang/c/ lang/c/docs/
lang/c/examples/ lang/c/src/ lang/c/tests/
Author: massie
Date: Thu Jan 28 02:06:31 2010
New Revision: 903941
URL: http://svn.apache.org/viewvc?rev=903941&view=rev
Log:
AVRO-384. Add schema projection to the C implementation
Added:
hadoop/avro/trunk/lang/c/src/datum_skip.c
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/lang/c/docs/index.txt
hadoop/avro/trunk/lang/c/examples/quickstop.c
hadoop/avro/trunk/lang/c/src/Makefile.am
hadoop/avro/trunk/lang/c/src/avro.h
hadoop/avro/trunk/lang/c/src/datum_read.c
hadoop/avro/trunk/lang/c/src/io.c
hadoop/avro/trunk/lang/c/tests/Makefile.am
hadoop/avro/trunk/lang/c/tests/test_valgrind
hadoop/avro/trunk/lang/c/version.sh
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Jan 28 02:06:31 2010
@@ -265,6 +265,8 @@
AVRO-381. Update documentation to talk about reference counting and
memory management (massie)
+ AVRO-384. Add schema projection to the C implementation (massie)
+
OPTIMIZATIONS
AVRO-172. More efficient schema processing (massie)
Modified: hadoop/avro/trunk/lang/c/docs/index.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/docs/index.txt?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/docs/index.txt (original)
+++ hadoop/avro/trunk/lang/c/docs/index.txt Thu Jan 28 02:06:31 2010
@@ -154,16 +154,25 @@
| 32 34 .. .. .. .. .. .. | .. .. .. .. .. .. .. .. | 24..............
Now let's read all the records back out
-1 | Dante | Hicks | (555) 123-4567 | 32
-2 | Randal | Graves | (555) 123-5678 | 30
-3 | Veronica | Loughran | (555) 123-0987 | 28
-4 | Caitlin | Bree | (555) 123-2323 | 27
-5 | Bob | Silent | (555) 123-6422 | 29
-6 | Jay | ??? | (555) 123-9182 | 26
+1 | Dante | Hicks | (555) 123-4567 | 32
+2 | Randal | Graves | (555) 123-5678 | 30
+3 | Veronica | Loughran | (555) 123-0987 | 28
+4 | Caitlin | Bree | (555) 123-2323 | 27
+5 | Bob | Silent | (555) 123-6422 | 29
+6 | Jay | ??? | (555) 123-9182 | 26
+
+
+Use projection to print only the First name and phone numbers
+ Dante | (555) 123-4567 |
+ Randal | (555) 123-5678 |
+ Veronica | (555) 123-0987 |
+ Caitlin | (555) 123-2323 |
+ Bob | (555) 123-6422 |
+ Jay | (555) 123-9182 |
----
-The *Quick Stop* store owner was so pleased, he asked you to create a
-movie database for his *RST Video* store.
+The *Quick Stop* owner was so pleased, he asked you to create a
+movie database for his *RST Video* store.
== Reference files
Modified: hadoop/avro/trunk/lang/c/examples/quickstop.c
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/examples/quickstop.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/examples/quickstop.c (original)
+++ hadoop/avro/trunk/lang/c/examples/quickstop.c Thu Jan 28 02:06:31 2010
@@ -20,8 +20,6 @@
char buf[4096];
avro_schema_t person_schema;
-avro_reader_t reader;
-avro_writer_t writer;
int64_t id = 0;
/* A simple schema for our tutorial */
@@ -48,7 +46,8 @@
/* Create a datum to match the person schema and save it */
void
-add_person(const char *first, const char *last, const char *phone, int32_t age)
+add_person(avro_writer_t writer, const char *first, const char *last,
+ const char *phone, int32_t age)
{
avro_datum_t person = avro_record("Person");
@@ -84,12 +83,12 @@
fprintf(stdout, "Successfully added %s, %s id=%ld\n", last, first, id);
}
-int print_person(void)
+int print_person(avro_reader_t reader, avro_schema_t reader_schema)
{
int rval;
avro_datum_t person;
- rval = avro_read_data(reader, person_schema, NULL, &person);
+ rval = avro_read_data(reader, person_schema, reader_schema, &person);
if (rval == 0) {
int64_t i64;
int32_t i32;
@@ -97,22 +96,27 @@
avro_datum_t id_datum, first_datum, last_datum, phone_datum,
age_datum;
- avro_record_get(person, "ID", &id_datum);
- avro_record_get(person, "First", &first_datum);
- avro_record_get(person, "Last", &last_datum);
- avro_record_get(person, "Phone", &phone_datum);
- avro_record_get(person, "Age", &age_datum);
-
- avro_int64_get(id_datum, &i64);
- fprintf(stdout, "%ld | ", i64);
- avro_string_get(first_datum, &p);
- fprintf(stdout, "%15s | ", p);
- avro_string_get(last_datum, &p);
- fprintf(stdout, "%15s | ", p);
- avro_string_get(phone_datum, &p);
- fprintf(stdout, "%25s | ", p);
- avro_int32_get(age_datum, &i32);
- fprintf(stdout, " %2d\n", i32);
+ if (avro_record_get(person, "ID", &id_datum) == 0) {
+ avro_int64_get(id_datum, &i64);
+ fprintf(stdout, "%ld | ", i64);
+ }
+ if (avro_record_get(person, "First", &first_datum) == 0) {
+ avro_string_get(first_datum, &p);
+ fprintf(stdout, "%15s | ", p);
+ }
+ if (avro_record_get(person, "Last", &last_datum) == 0) {
+ avro_string_get(last_datum, &p);
+ fprintf(stdout, "%15s | ", p);
+ }
+ if (avro_record_get(person, "Phone", &phone_datum) == 0) {
+ avro_string_get(phone_datum, &p);
+ fprintf(stdout, "%15s | ", p);
+ }
+ if (avro_record_get(person, "Age", &age_datum) == 0) {
+ avro_int32_get(age_datum, &i32);
+ fprintf(stdout, "%ld", i32);
+ }
+ fprintf(stdout, "\n");
/* We no longer need this memory */
avro_datum_decref(person);
@@ -122,22 +126,23 @@
int main(void)
{
+ avro_reader_t reader;
+ avro_writer_t writer;
+ avro_schema_t projection_schema, first_name_schema, phone_schema;
int64_t i;
- /* Create readers and writers backed by memory */
- writer = avro_writer_memory(buf, sizeof(buf));
- reader = avro_reader_memory(buf, sizeof(buf));
-
/* Initialize the schema structure from JSON */
init_schema();
/* Add people to the database */
- add_person("Dante", "Hicks", "(555) 123-4567", 32);
- add_person("Randal", "Graves", "(555) 123-5678", 30);
- add_person("Veronica", "Loughran", "(555) 123-0987", 28);
- add_person("Caitlin", "Bree", "(555) 123-2323", 27);
- add_person("Bob", "Silent", "(555) 123-6422", 29);
- add_person("Jay", "???", "(555) 123-9182", 26);
+ writer = avro_writer_memory(buf, sizeof(buf));
+ add_person(writer, "Dante", "Hicks", "(555) 123-4567", 32);
+ add_person(writer, "Randal", "Graves", "(555) 123-5678", 30);
+ add_person(writer, "Veronica", "Loughran", "(555) 123-0987", 28);
+ add_person(writer, "Caitlin", "Bree", "(555) 123-2323", 27);
+ add_person(writer, "Bob", "Silent", "(555) 123-6422", 29);
+ add_person(writer, "Jay", "???", "(555) 123-9182", 26);
+ avro_writer_free(writer);
fprintf(stdout,
"\nAvro is compact. Here is the data for all %ld people.\n",
@@ -147,18 +152,43 @@
fprintf(stdout, "\nNow let's read all the records back out\n");
/* Read all the records and print them */
+ reader = avro_reader_memory(buf, sizeof(buf));
for (i = 0; i < id; i++) {
- if (print_person()) {
+ if (print_person(reader, NULL)) {
fprintf(stderr, "Error printing person\n");
exit(EXIT_FAILURE);
}
}
+ avro_reader_free(reader);
- /* We don't need this schema anymore */
- avro_schema_decref(person_schema);
+ /* You can also use projection, to only decode only the data you are
+ interested in. This is particularly useful when you have
+ huge data sets and you'll only interest in particular fields
+ e.g. your contacts First name and phone number */
+ projection_schema = avro_schema_record("Person");
+ first_name_schema = avro_schema_string();
+ phone_schema = avro_schema_string();
+ avro_schema_record_field_append(projection_schema, "First",
+ first_name_schema);
+ avro_schema_record_field_append(projection_schema, "Phone",
+ phone_schema);
- /* We dont' need the reader/writer anymore */
+ /* Read only the record you're interested in */
+ fprintf(stdout,
+ "\n\nUse projection to print only the First name and phone numbers\n");
+ reader = avro_reader_memory(buf, sizeof(buf));
+ for (i = 0; i < id; i++) {
+ if (print_person(reader, projection_schema)) {
+ fprintf(stderr, "Error printing person\n");
+ exit(EXIT_FAILURE);
+ }
+ }
avro_reader_free(reader);
- avro_writer_free(writer);
+ avro_schema_decref(first_name_schema);
+ avro_schema_decref(phone_schema);
+ avro_schema_decref(projection_schema);
+
+ /* We don't need this schema anymore */
+ avro_schema_decref(person_schema);
return 0;
}
Modified: hadoop/avro/trunk/lang/c/src/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/Makefile.am?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/Makefile.am (original)
+++ hadoop/avro/trunk/lang/c/src/Makefile.am Thu Jan 28 02:06:31 2010
@@ -7,7 +7,7 @@
lib_LTLIBRARIES = libavro.la
libavro_la_SOURCES = st.c st.h schema.c schema.h schema_printf.c schema_equal.c \
-datum.c datum_equal.c datum_validate.c datum_read.c datum_write.c datum.h \
+datum.c datum_equal.c datum_validate.c datum_read.c datum_skip.c datum_write.c datum.h \
io.c dump.c dump.h encoding_binary.c \
container_of.h queue.h encoding.h
libavro_la_LIBADD = $(top_builddir)/jansson/src/.libs/libjansson.a
Modified: hadoop/avro/trunk/lang/c/src/avro.h
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/avro.h?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/avro.h (original)
+++ hadoop/avro/trunk/lang/c/src/avro.h Thu Jan 28 02:06:31 2010
@@ -255,6 +255,7 @@
int avro_read_data(avro_reader_t reader,
avro_schema_t writer_schema,
avro_schema_t reader_schema, avro_datum_t * datum);
+int avro_skip_data(avro_reader_t reader, avro_schema_t writer_schema);
int avro_write_data(avro_writer_t writer,
avro_schema_t writer_schema, avro_datum_t datum);
Modified: hadoop/avro/trunk/lang/c/src/datum_read.c
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/datum_read.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/datum_read.c (original)
+++ hadoop/avro/trunk/lang/c/src/datum_read.c Thu Jan 28 02:06:31 2010
@@ -285,8 +285,10 @@
}
avro_datum_decref(field_datum);
} else {
- /* TODO: skip_record */
- return -1;
+ rval = avro_skip_data(reader, field->type);
+ if (rval) {
+ return rval;
+ }
}
}
return 0;
Added: hadoop/avro/trunk/lang/c/src/datum_skip.c
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/datum_skip.c?rev=903941&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c/src/datum_skip.c (added)
+++ hadoop/avro/trunk/lang/c/src/datum_skip.c Thu Jan 28 02:06:31 2010
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include "encoding.h"
+#include "schema.h"
+
+static int skip_array(avro_reader_t reader, const avro_encoding_t * enc,
+ struct avro_array_schema_t *writers_schema)
+{
+ int rval;
+ int64_t i;
+ int64_t block_count;
+ int64_t block_size;
+
+ rval = enc->read_long(reader, &block_count);
+ if (rval) {
+ return rval;
+ }
+
+ while (block_count != 0) {
+ if (block_count < 0) {
+ block_count = block_count * -1;
+ rval = enc->read_long(reader, &block_size);
+ if (rval) {
+ return rval;
+ }
+ }
+
+ for (i = 0; i < block_count; i++) {
+ rval = avro_skip_data(reader, writers_schema->items);
+ if (rval) {
+ return rval;
+ }
+ }
+
+ rval = enc->read_long(reader, &block_count);
+ if (rval) {
+ return rval;
+ }
+ }
+ return 0;
+}
+
+static int skip_map(avro_reader_t reader, const avro_encoding_t * enc,
+ struct avro_map_schema_t *writers_schema)
+{
+ int rval;
+ int64_t i, block_count;
+
+ rval = enc->read_long(reader, &block_count);
+ if (rval) {
+ return rval;
+ }
+ while (block_count != 0) {
+ int64_t block_size;
+ if (block_count < 0) {
+ block_count = block_count * -1;
+ rval = enc->read_long(reader, &block_size);
+ if (rval) {
+ return rval;
+ }
+ }
+ for (i = 0; i < block_count; i++) {
+ rval = enc->skip_string(reader);
+ if (rval) {
+ return rval;
+ }
+ rval =
+ avro_skip_data(reader,
+ avro_schema_to_map(writers_schema)->
+ values);
+ if (rval) {
+ return rval;
+ }
+ }
+ rval = enc->read_long(reader, &block_count);
+ if (rval) {
+ return rval;
+ }
+ }
+ return 0;
+}
+
+static int skip_union(avro_reader_t reader, const avro_encoding_t * enc,
+ struct avro_union_schema_t *writers_schema)
+{
+ int rval;
+ int64_t i, index;
+ struct avro_union_branch_t *branch;
+
+ rval = enc->read_long(reader, &index);
+ if (rval) {
+ return rval;
+ }
+
+ branch = STAILQ_FIRST(&writers_schema->branches);
+ for (i = 0; i != index && branch != NULL;
+ branch = STAILQ_NEXT(branch, branches)) {
+ }
+ if (!branch) {
+ return EILSEQ;
+ }
+ return avro_skip_data(reader, branch->schema);
+}
+
+static int skip_record(avro_reader_t reader, const avro_encoding_t * enc,
+ struct avro_record_schema_t *writers_schema)
+{
+ int rval;
+ struct avro_record_field_t *field;
+
+ for (field = STAILQ_FIRST(&writers_schema->fields);
+ field != NULL; field = STAILQ_NEXT(field, fields)) {
+ rval = avro_skip_data(reader, field->type);
+ if (rval) {
+ return rval;
+ }
+ }
+ return 0;
+}
+
+int avro_skip_data(avro_reader_t reader, avro_schema_t writers_schema)
+{
+ int rval = EINVAL;
+ const avro_encoding_t *enc = &avro_binary_encoding;
+
+ if (!reader || !is_avro_schema(writers_schema)) {
+ return EINVAL;
+ }
+
+ switch (avro_typeof(writers_schema)) {
+ case AVRO_NULL:
+ rval = enc->skip_null(reader);
+ break;
+
+ case AVRO_BOOLEAN:
+ rval = enc->skip_boolean(reader);
+ break;
+
+ case AVRO_STRING:
+ rval = enc->skip_string(reader);
+ break;
+
+ case AVRO_INT32:
+ rval = enc->skip_int(reader);
+ break;
+
+ case AVRO_INT64:
+ rval = enc->skip_long(reader);
+ break;
+
+ case AVRO_FLOAT:
+ rval = enc->skip_float(reader);
+ break;
+
+ case AVRO_DOUBLE:
+ rval = enc->skip_double(reader);
+ break;
+
+ case AVRO_BYTES:
+ rval = enc->skip_bytes(reader);
+ break;
+
+ case AVRO_FIXED:
+ rval =
+ avro_skip(reader,
+ avro_schema_to_fixed(writers_schema)->size);
+ break;
+
+ case AVRO_ENUM:
+ rval = enc->skip_long(reader);
+ break;
+
+ case AVRO_ARRAY:
+ rval =
+ skip_array(reader, enc,
+ avro_schema_to_array(writers_schema));
+ break;
+
+ case AVRO_MAP:
+ rval =
+ skip_map(reader, enc, avro_schema_to_map(writers_schema));
+ break;
+
+ case AVRO_UNION:
+ rval =
+ skip_union(reader, enc,
+ avro_schema_to_union(writers_schema));
+ break;
+
+ case AVRO_RECORD:
+ rval =
+ skip_record(reader, enc,
+ avro_schema_to_record(writers_schema));
+ break;
+
+ case AVRO_LINK:
+ rval =
+ avro_skip_data(reader,
+ (avro_schema_to_link(writers_schema))->to);
+ break;
+ }
+
+ return rval;
+}
Modified: hadoop/avro/trunk/lang/c/src/io.c
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/io.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/io.c (original)
+++ hadoop/avro/trunk/lang/c/src/io.c Thu Jan 28 02:06:31 2010
@@ -139,7 +139,7 @@
static int
avro_read_memory(struct avro_memory_reader_t *reader, void *buf, int64_t len)
{
- if (len) {
+ if (len > 0) {
if ((reader->len - reader->read) < len) {
return ENOSPC;
}
@@ -174,6 +174,42 @@
return EINVAL;
}
+static int avro_skip_memory(struct avro_memory_reader_t *reader, int64_t len)
+{
+ if (len > 0) {
+ if ((reader->len - reader->read) < len) {
+ return ENOSPC;
+ }
+ reader->read += len;
+ }
+ return 0;
+}
+
+static int avro_skip_file(struct avro_file_reader_t *reader, int64_t len)
+{
+ int rval;
+ if (len > 0) {
+ rval = fseek(reader->fp, len, SEEK_CUR);
+ if (rval < 0) {
+ return rval;
+ }
+ }
+ return 0;
+}
+
+int avro_skip(avro_reader_t reader, int64_t len)
+{
+ if (len >= 0) {
+ if (is_memory_io(reader)) {
+ return avro_skip_memory(avro_reader_to_memory(reader),
+ len);
+ } else if (is_file_io(reader)) {
+ return avro_skip_file(avro_reader_to_file(reader), len);
+ }
+ }
+ return 0;
+}
+
static int
avro_write_memory(struct avro_memory_writer_t *writer, void *buf, int64_t len)
{
@@ -206,7 +242,7 @@
if (is_memory_io(writer)) {
return avro_write_memory(avro_writer_to_memory(writer),
buf, len);
- } else if (is_memory_io(writer)) {
+ } else if (is_file_io(writer)) {
return avro_write_file(avro_writer_to_file(writer), buf,
len);
}
@@ -214,14 +250,6 @@
return EINVAL;
}
-int avro_skip(avro_reader_t reader, int64_t len)
-{
- /*
- * TODO
- */
- return -1;
-}
-
void avro_writer_dump(avro_writer_t writer, FILE * fp)
{
if (is_memory_io(writer)) {
Modified: hadoop/avro/trunk/lang/c/tests/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/tests/Makefile.am?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/tests/Makefile.am (original)
+++ hadoop/avro/trunk/lang/c/tests/Makefile.am Thu Jan 28 02:06:31 2010
@@ -15,7 +15,4 @@
test_avro_data_SOURCES=test_avro_data.c
test_avro_data_LDADD=$(test_LDADD)
-test_avro_interop_SOURCES=test_avro_interop.c
-test_avro_interop_LDADD=$(test_LDADD)
-
TESTS=$(check_PROGRAMS) test_valgrind
Modified: hadoop/avro/trunk/lang/c/tests/test_valgrind
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/tests/test_valgrind?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/tests/test_valgrind (original)
+++ hadoop/avro/trunk/lang/c/tests/test_valgrind Thu Jan 28 02:06:31 2010
@@ -22,8 +22,7 @@
exit 77
fi
-LD_LIBRARY_PATH="../src/.libs/:${LD_LIBRARY_PATH}"
-valgrind --leak-check=full --show-reachable=yes -q .libs/test_avro_data 2>&1 |\
+LD_LIBRARY_PATH="../src/.libs/" valgrind --leak-check=full --show-reachable=yes -q .libs/test_avro_data 2>&1 |\
grep -E '^==[0-9]+== '
if [ $? -eq 0 ]; then
# Expression found. Test failed.
Modified: hadoop/avro/trunk/lang/c/version.sh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/version.sh?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/version.sh (original)
+++ hadoop/avro/trunk/lang/c/version.sh Thu Jan 28 02:06:31 2010
@@ -18,7 +18,7 @@
# libavro_binary_age = 0
# libavro_interface_age = 0
#
-libavro_micro_version=14
+libavro_micro_version=15
libavro_interface_age=0
libavro_binary_age=0