You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2018/11/29 21:26:27 UTC
[avro] branch master updated: C: Allow file with no records. (#160)
This is an automated email from the ASF dual-hosted git repository.
dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new b991c93 C: Allow file with no records. (#160)
b991c93 is described below
commit b991c93ad1238a1b128213a5d7ef754ddeaa827e
Author: walshb <wa...@users.noreply.github.com>
AuthorDate: Thu Nov 29 21:26:23 2018 +0000
C: Allow file with no records. (#160)
---
lang/c/src/datafile.c | 30 +++++--
lang/c/tests/CMakeLists.txt | 1 +
lang/c/tests/test_avro_1906.c | 204 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 227 insertions(+), 8 deletions(-)
diff --git a/lang/c/src/datafile.c b/lang/c/src/datafile.c
index 95096c1..4dad1ba 100644
--- a/lang/c/src/datafile.c
+++ b/lang/c/src/datafile.c
@@ -527,7 +527,9 @@ int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
r->current_blocklen = 0;
rval = file_read_block_count(r);
- if (rval) {
+ if (rval == EOF) {
+ r->blocks_total = 0;
+ } else if (rval) {
avro_reader_free(r->reader);
avro_codec_reset(r->codec);
avro_freet(struct avro_codec_t_, r->codec);
@@ -536,7 +538,7 @@ int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
}
*reader = r;
- return rval;
+ return 0;
}
int avro_file_reader(const char *path, avro_file_reader_t * reader)
@@ -692,10 +694,11 @@ int avro_file_reader_read(avro_file_reader_t r, avro_schema_t readers_schema,
check_param(EINVAL, r, "reader");
check_param(EINVAL, datum, "datum");
- check(rval,
- avro_read_data(r->block_reader, r->writers_schema, readers_schema,
- datum));
- r->blocks_read++;
+ /* This will be set to zero when an empty file is opened.
+ * Return EOF here when the user attempts to read. */
+ if (r->blocks_total == 0) {
+ return EOF;
+ }
if (r->blocks_read == r->blocks_total) {
check(rval, avro_read(r->reader, sync, sizeof(sync)));
@@ -704,9 +707,14 @@ int avro_file_reader_read(avro_file_reader_t r, avro_schema_t readers_schema,
avro_set_error("Incorrect sync bytes");
return EILSEQ;
}
- /* For now, ignore errors (e.g. EOF) */
- file_read_block_count(r);
+ check(rval, file_read_block_count(r));
}
+
+ check(rval,
+ avro_read_data(r->block_reader, r->writers_schema, readers_schema,
+ datum));
+ r->blocks_read++;
+
return 0;
}
@@ -719,6 +727,12 @@ avro_file_reader_read_value(avro_file_reader_t r, avro_value_t *value)
check_param(EINVAL, r, "reader");
check_param(EINVAL, value, "value");
+ /* This will be set to zero when an empty file is opened.
+ * Return EOF here when the user attempts to read. */
+ if (r->blocks_total == 0) {
+ return EOF;
+ }
+
if (r->blocks_read == r->blocks_total) {
/* reads sync bytes and buffers further bytes */
check(rval, avro_read(r->reader, sync, sizeof(sync)));
diff --git a/lang/c/tests/CMakeLists.txt b/lang/c/tests/CMakeLists.txt
index c66f67c..1d533c6 100644
--- a/lang/c/tests/CMakeLists.txt
+++ b/lang/c/tests/CMakeLists.txt
@@ -64,4 +64,5 @@ add_avro_test(test_refcount)
add_avro_test(test_cpp test_cpp.cpp)
add_avro_test(test_avro_1379)
add_avro_test(test_avro_1691)
+add_avro_test(test_avro_1906)
add_avro_test(test_avro_1904)
diff --git a/lang/c/tests/test_avro_1906.c b/lang/c/tests/test_avro_1906.c
new file mode 100644
index 0000000..574cd6d
--- /dev/null
+++ b/lang/c/tests/test_avro_1906.c
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <stdio.h>
+#include <sys/stat.h>
+#include "avro.h"
+
+static const char *filename = "avro_file.dat";
+
+static const char PERSON_SCHEMA[] =
+ "{"
+ " \"type\":\"record\","
+ " \"name\":\"Person\","
+ " \"fields\": ["
+ " {\"name\": \"ab\", \"type\": \"int\"}"
+ " ]"
+ "}";
+
+static int read_data() {
+ int rval;
+ int records_read = 0;
+
+ avro_file_reader_t reader;
+ avro_value_iface_t *iface;
+ avro_value_t value;
+
+ fprintf(stderr, "\nReading...\n");
+
+ rval = avro_file_reader(filename, &reader);
+
+ if (rval) {
+ fprintf(stderr, "Error: %s\n", avro_strerror());
+ return -1;
+ }
+
+ avro_schema_t schema = avro_file_reader_get_writer_schema(reader);
+
+ iface = avro_generic_class_from_schema(schema);
+ avro_generic_value_new(iface, &value);
+
+ while ((rval = avro_file_reader_read_value(reader, &value)) == 0) {
+ avro_value_t field;
+ int32_t val;
+ avro_value_get_by_index(&value, 0, &field, NULL);
+ avro_value_get_int(&field, &val);
+ fprintf(stderr, "value = %d\n", val);
+ records_read++;
+ avro_value_reset(&value);
+ }
+
+ avro_value_decref(&value);
+ avro_value_iface_decref(iface);
+ avro_schema_decref(schema);
+ avro_file_reader_close(reader);
+
+ fprintf(stderr, "read %d records.\n", records_read);
+
+ if (rval != EOF) {
+ fprintf(stderr, "Error: %s\n", avro_strerror());
+ return -1;
+ }
+
+ return records_read;
+}
+
+static int read_data_datum() {
+ int rval;
+ int records_read = 0;
+
+ avro_file_reader_t reader;
+ avro_datum_t datum;
+
+ fprintf(stderr, "\nReading...\n");
+
+ rval = avro_file_reader(filename, &reader);
+
+ if (rval) {
+ fprintf(stderr, "Error using 'datum': %s\n", avro_strerror());
+ return -1;
+ }
+
+ avro_schema_t schema = avro_file_reader_get_writer_schema(reader);
+
+ while ((rval = avro_file_reader_read(reader, schema, &datum)) == 0) {
+ avro_datum_t val_datum;
+ int32_t val;
+ if (avro_record_get(datum, "ab", &val_datum)) {
+ fprintf(stderr, "Error getting value: %s\n", avro_strerror());
+ return -1;
+ }
+ avro_int32_get(val_datum, &val);
+ fprintf(stderr, "value = %d\n", val);
+ records_read++;
+ avro_datum_decref(datum);
+ }
+
+ avro_schema_decref(schema);
+ avro_file_reader_close(reader);
+
+ fprintf(stderr, "read %d records using 'datum'.\n", records_read);
+
+ if (rval != EOF) {
+ fprintf(stderr, "Error using 'datum': %s\n", avro_strerror());
+ return -1;
+ }
+
+ return records_read;
+}
+
+static int write_data(int n_records) {
+ int i;
+ avro_schema_t schema;
+ avro_schema_error_t error;
+ avro_file_writer_t writer;
+ avro_value_iface_t *iface;
+ avro_value_t value;
+
+ fprintf(stderr, "\nWriting...\n");
+
+ if (avro_schema_from_json(PERSON_SCHEMA, 0, &schema, &error)) {
+ fprintf(stderr, "Unable to parse schema\n");
+ return -1;
+ }
+
+ if (avro_file_writer_create(filename, schema, &writer)) {
+ fprintf(stderr, "There was an error creating file: %s\n", avro_strerror());
+ return -1;
+ }
+
+ iface = avro_generic_class_from_schema(schema);
+ avro_generic_value_new(iface, &value);
+
+ avro_value_t field;
+
+ avro_value_get_by_index(&value, 0, &field, NULL);
+ avro_value_set_int(&field, 123);
+
+ for (i = 0; i < n_records; i++) {
+ if (avro_file_writer_append_value(writer, &value)) {
+ fprintf(stderr, "There was an error writing file: %s\n", avro_strerror());
+ return -1;
+ }
+ }
+
+ if (avro_file_writer_close(writer)) {
+ fprintf(stderr, "There was an error creating file: %s\n", avro_strerror());
+ return -1;
+ }
+
+ avro_value_decref(&value);
+ avro_value_iface_decref(iface);
+ avro_schema_decref(schema);
+
+ return n_records;
+}
+
+static int test_n_records(int n_records) {
+ int res = 0;
+
+ if (write_data(n_records) != n_records) {
+ remove(filename);
+ return -1;
+ }
+
+ if (read_data() != n_records) {
+ remove(filename);
+ return -1;
+ }
+
+ if (read_data_datum() != n_records) {
+ remove(filename);
+ return -1;
+ }
+
+ remove(filename);
+ return 0;
+}
+
+int main()
+{
+ if (test_n_records(1) < 0) {
+ return EXIT_FAILURE;
+ }
+
+ if (test_n_records(0) < 0) {
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+}