You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2018/11/29 21:26:27 UTC

[avro] branch master updated: C: Allow file with no records. (#160)

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new b991c93  C: Allow file with no records. (#160)
b991c93 is described below

commit b991c93ad1238a1b128213a5d7ef754ddeaa827e
Author: walshb <wa...@users.noreply.github.com>
AuthorDate: Thu Nov 29 21:26:23 2018 +0000

    C: Allow file with no records. (#160)
---
 lang/c/src/datafile.c         |  30 +++++--
 lang/c/tests/CMakeLists.txt   |   1 +
 lang/c/tests/test_avro_1906.c | 204 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 227 insertions(+), 8 deletions(-)

diff --git a/lang/c/src/datafile.c b/lang/c/src/datafile.c
index 95096c1..4dad1ba 100644
--- a/lang/c/src/datafile.c
+++ b/lang/c/src/datafile.c
@@ -527,7 +527,9 @@ int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
 	r->current_blocklen = 0;
 
 	rval = file_read_block_count(r);
-	if (rval) {
+	if (rval == EOF) {
+		r->blocks_total = 0;
+	} else if (rval) {
 		avro_reader_free(r->reader);
 		avro_codec_reset(r->codec);
 		avro_freet(struct avro_codec_t_, r->codec);
@@ -536,7 +538,7 @@ int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
 	}
 
 	*reader = r;
-	return rval;
+	return 0;
 }
 
 int avro_file_reader(const char *path, avro_file_reader_t * reader)
@@ -692,10 +694,11 @@ int avro_file_reader_read(avro_file_reader_t r, avro_schema_t readers_schema,
 	check_param(EINVAL, r, "reader");
 	check_param(EINVAL, datum, "datum");
 
-	check(rval,
-	      avro_read_data(r->block_reader, r->writers_schema, readers_schema,
-			     datum));
-	r->blocks_read++;
+	/* This will be set to zero when an empty file is opened.
+	 * Return EOF here when the user attempts to read. */
+	if (r->blocks_total == 0) {
+		return EOF;
+	}
 
 	if (r->blocks_read == r->blocks_total) {
 		check(rval, avro_read(r->reader, sync, sizeof(sync)));
@@ -704,9 +707,14 @@ int avro_file_reader_read(avro_file_reader_t r, avro_schema_t readers_schema,
 			avro_set_error("Incorrect sync bytes");
 			return EILSEQ;
 		}
-		/* For now, ignore errors (e.g. EOF) */
-		file_read_block_count(r);
+		check(rval, file_read_block_count(r));
 	}
+
+	check(rval,
+	      avro_read_data(r->block_reader, r->writers_schema, readers_schema,
+			     datum));
+	r->blocks_read++;
+
 	return 0;
 }
 
@@ -719,6 +727,12 @@ avro_file_reader_read_value(avro_file_reader_t r, avro_value_t *value)
 	check_param(EINVAL, r, "reader");
 	check_param(EINVAL, value, "value");
 
+	/* This will be set to zero when an empty file is opened.
+	 * Return EOF here when the user attempts to read. */
+	if (r->blocks_total == 0) {
+		return EOF;
+	}
+
 	if (r->blocks_read == r->blocks_total) {
 		/* reads sync bytes and buffers further bytes */
 		check(rval, avro_read(r->reader, sync, sizeof(sync)));
diff --git a/lang/c/tests/CMakeLists.txt b/lang/c/tests/CMakeLists.txt
index c66f67c..1d533c6 100644
--- a/lang/c/tests/CMakeLists.txt
+++ b/lang/c/tests/CMakeLists.txt
@@ -64,4 +64,5 @@ add_avro_test(test_refcount)
 add_avro_test(test_cpp test_cpp.cpp)
 add_avro_test(test_avro_1379)
 add_avro_test(test_avro_1691)
+add_avro_test(test_avro_1906)
 add_avro_test(test_avro_1904)
diff --git a/lang/c/tests/test_avro_1906.c b/lang/c/tests/test_avro_1906.c
new file mode 100644
index 0000000..574cd6d
--- /dev/null
+++ b/lang/c/tests/test_avro_1906.c
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <stdio.h>
+#include <sys/stat.h>
+#include "avro.h"
+
+static const char *filename = "avro_file.dat";
+
+static const char PERSON_SCHEMA[] =
+	"{"
+	"    \"type\":\"record\","
+	"    \"name\":\"Person\","
+	"    \"fields\": ["
+	"        {\"name\": \"ab\", \"type\": \"int\"}"
+	"    ]"
+	"}";
+
+static int read_data() {
+	int rval;
+	int records_read = 0;
+
+	avro_file_reader_t reader;
+	avro_value_iface_t *iface;
+	avro_value_t value;
+
+	fprintf(stderr, "\nReading...\n");
+
+	rval = avro_file_reader(filename, &reader);
+
+	if (rval) {
+		fprintf(stderr, "Error: %s\n", avro_strerror());
+		return -1;
+	}
+
+	avro_schema_t schema = avro_file_reader_get_writer_schema(reader);
+
+	iface = avro_generic_class_from_schema(schema);
+	avro_generic_value_new(iface, &value);
+
+	while ((rval = avro_file_reader_read_value(reader, &value)) == 0) {
+		avro_value_t field;
+		int32_t val;
+		avro_value_get_by_index(&value, 0, &field, NULL);
+		avro_value_get_int(&field, &val);
+		fprintf(stderr, "value = %d\n", val);
+		records_read++;
+		avro_value_reset(&value);
+	}
+
+	avro_value_decref(&value);
+	avro_value_iface_decref(iface);
+	avro_schema_decref(schema);
+	avro_file_reader_close(reader);
+
+	fprintf(stderr, "read %d records.\n", records_read);
+
+	if (rval != EOF) {
+		fprintf(stderr, "Error: %s\n", avro_strerror());
+		return -1;
+	}
+
+	return records_read;
+}
+
+static int read_data_datum() {
+	int rval;
+	int records_read = 0;
+
+	avro_file_reader_t reader;
+	avro_datum_t datum;
+
+	fprintf(stderr, "\nReading...\n");
+
+	rval = avro_file_reader(filename, &reader);
+
+	if (rval) {
+		fprintf(stderr, "Error using 'datum': %s\n", avro_strerror());
+		return -1;
+	}
+
+	avro_schema_t schema = avro_file_reader_get_writer_schema(reader);
+
+	while ((rval = avro_file_reader_read(reader, schema, &datum)) == 0) {
+		avro_datum_t val_datum;
+		int32_t val;
+		if (avro_record_get(datum, "ab", &val_datum)) {
+			fprintf(stderr, "Error getting value: %s\n", avro_strerror());
+			return -1;
+		}
+		avro_int32_get(val_datum, &val);
+		fprintf(stderr, "value = %d\n", val);
+		records_read++;
+		avro_datum_decref(datum);
+	}
+
+	avro_schema_decref(schema);
+	avro_file_reader_close(reader);
+
+	fprintf(stderr, "read %d records using 'datum'.\n", records_read);
+
+	if (rval != EOF) {
+		fprintf(stderr, "Error using 'datum': %s\n", avro_strerror());
+		return -1;
+	}
+
+	return records_read;
+}
+
+static int write_data(int n_records) {
+	int  i;
+	avro_schema_t schema;
+	avro_schema_error_t error;
+	avro_file_writer_t writer;
+	avro_value_iface_t *iface;
+	avro_value_t value;
+
+	fprintf(stderr, "\nWriting...\n");
+
+	if (avro_schema_from_json(PERSON_SCHEMA, 0, &schema, &error)) {
+		fprintf(stderr, "Unable to parse schema\n");
+		return -1;
+	}
+
+	if (avro_file_writer_create(filename, schema, &writer)) {
+		fprintf(stderr, "There was an error creating file: %s\n", avro_strerror());
+		return -1;
+	}
+
+	iface = avro_generic_class_from_schema(schema);
+	avro_generic_value_new(iface, &value);
+
+	avro_value_t field;
+
+	avro_value_get_by_index(&value, 0, &field, NULL);
+	avro_value_set_int(&field, 123);
+
+	for (i = 0; i < n_records; i++) {
+		if (avro_file_writer_append_value(writer, &value)) {
+			fprintf(stderr, "There was an error writing file: %s\n", avro_strerror());
+			return -1;
+		}
+	}
+
+	if (avro_file_writer_close(writer)) {
+		fprintf(stderr, "There was an error creating file: %s\n", avro_strerror());
+		return -1;
+	}
+
+	avro_value_decref(&value);
+	avro_value_iface_decref(iface);
+	avro_schema_decref(schema);
+
+	return n_records;
+}
+
+static int test_n_records(int n_records) {
+	int res = 0;
+
+	if (write_data(n_records) != n_records) {
+		remove(filename);
+		return -1;
+	}
+
+	if (read_data() != n_records) {
+		remove(filename);
+		return -1;
+	}
+
+	if (read_data_datum() != n_records) {
+		remove(filename);
+		return -1;
+	}
+
+	remove(filename);
+	return 0;
+}
+
+int main()
+{
+	if (test_n_records(1) < 0) {
+		return EXIT_FAILURE;
+	}
+
+	if (test_n_records(0) < 0) {
+		return EXIT_FAILURE;
+	}
+
+	return EXIT_SUCCESS;
+}