You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dc...@apache.org on 2011/12/20 03:27:45 UTC

svn commit: r1221076 - in /avro/trunk: CHANGES.txt lang/c/CMakeLists.txt lang/c/examples/quickstop.c lang/c/src/CMakeLists.txt lang/c/src/avro-c.pc.in lang/c/src/avro/io.h lang/c/src/codec.c lang/c/src/codec.h lang/c/src/datafile.c

Author: dcreager
Date: Tue Dec 20 02:27:44 2011
New Revision: 1221076

URL: http://svn.apache.org/viewvc?rev=1221076&view=rev
Log:
AVRO-957. C: Codec support

The C library now supports reading and writing Avro data files that have
compressed data blocks.  We currently support the deflate and lzma
codecs.  The codec API is nice and modular, so it should be easy to add
snappy as well.  Contributed by Michael Cooper and Lucas Martin-King.

Added:
    avro/trunk/lang/c/src/codec.c
    avro/trunk/lang/c/src/codec.h
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/c/CMakeLists.txt
    avro/trunk/lang/c/examples/quickstop.c
    avro/trunk/lang/c/src/CMakeLists.txt
    avro/trunk/lang/c/src/avro-c.pc.in
    avro/trunk/lang/c/src/avro/io.h
    avro/trunk/lang/c/src/datafile.c

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1221076&r1=1221075&r2=1221076&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Dec 20 02:27:44 2011
@@ -33,6 +33,9 @@ Avro 1.6.2 (unreleased)
     AVRO-961. C: avrocat/avropipe can now read from stdin.
     (Michael Cooper via dcreager)
 
+    AVRO-957. C: Codec support in C library.  (Michael Cooper and Lucas
+    Martin-King via dcreager)
+
   BUG FIXES
 
     AVRO-962. Java: Fix Maven plugin to support string type override.

Modified: avro/trunk/lang/c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/CMakeLists.txt?rev=1221076&r1=1221075&r2=1221076&view=diff
==============================================================================
--- avro/trunk/lang/c/CMakeLists.txt (original)
+++ avro/trunk/lang/c/CMakeLists.txt Tue Dec 20 02:27:44 2011
@@ -20,6 +20,10 @@ cmake_minimum_required(VERSION 2.4)
 project(AvroC)
 enable_testing()
 
+# Eliminates warning about linker paths when linking both zlib and
+# liblzma.
+cmake_policy(SET CMP0003 NEW)
+
 #-----------------------------------------------------------------------
 # Retrieve the current version number
 
@@ -69,6 +73,7 @@ string(REGEX REPLACE ".*\\..*\\.[0-9]+(.
 # Source package support
 
 include(CPackConfig.txt)
+include(CheckLibraryExists)
 
 
 if(APPLE)
@@ -87,6 +92,34 @@ endif(CMAKE_COMPILER_IS_GNUCC)
 include_directories(${AvroC_SOURCE_DIR}/src)
 include_directories(${AvroC_SOURCE_DIR}/jansson/src)
 
+
+# Enable codecs
+find_package(ZLIB)
+if (ZLIB_FOUND)
+    set(ZLIB_PKG zlib)
+    add_definitions(-DDEFLATE_CODEC)
+    include_directories(${ZLIB_INCLUDE_DIR})
+    message("Enabled deflate codec")
+else (ZLIB_FOUND)
+    message("Disabled deflate codec. zlib not found.")
+endif (ZLIB_FOUND)
+
+find_package(PkgConfig)
+pkg_check_modules(LZMA liblzma)
+if (LZMA_FOUND)
+    set(LZMA_PKG liblzma)
+    add_definitions(-DLZMA_CODEC)
+    include_directories(${LZMA_INCLUDE_DIRS})
+    link_directories(${LZMA_LIBRARY_DIRS})
+    message("Enabled lzma codec")
+else (LZMA_FOUND)
+    message("Disabled lzma codec. liblzma not found.")
+endif (LZMA_FOUND)
+
+set(CODEC_LIBRARIES ${ZLIB_LIBRARIES} ${LZMA_LIBRARIES})
+set(CODEC_PKG "@ZLIB_PKG@ @LZMA_PKG@")
+
+
 add_subdirectory(src)
 add_subdirectory(examples)
 add_subdirectory(tests)

Modified: avro/trunk/lang/c/examples/quickstop.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/examples/quickstop.c?rev=1221076&r1=1221075&r2=1221076&view=diff
==============================================================================
--- avro/trunk/lang/c/examples/quickstop.c (original)
+++ avro/trunk/lang/c/examples/quickstop.c Tue Dec 20 02:27:44 2011
@@ -64,13 +64,13 @@ add_person(avro_file_writer_t db, const 
 	    || avro_record_set(person, "Last", last_datum)
 	    || avro_record_set(person, "Age", age_datum)
 	    || avro_record_set(person, "Phone", phone_datum)) {
-		fprintf(stderr, "Unable to create Person datum structure");
+		fprintf(stderr, "Unable to create Person datum structure\n");
 		exit(EXIT_FAILURE);
 	}
 
 	if (avro_file_writer_append(db, person)) {
 		fprintf(stderr,
-			"Unable to write Person datum to memory buffer");
+			"Unable to write Person datum to memory buffer\nMessage: %s\n", avro_strerror());
 		exit(EXIT_FAILURE);
 	}
 
@@ -142,9 +142,10 @@ int main(void)
 	/* Delete the database if it exists */
 	unlink(dbname);
 	/* Create a new database */
-	rval = avro_file_writer_create(dbname, person_schema, &db);
+	rval = avro_file_writer_create_with_codec(dbname, person_schema, &db, "deflate", 0);
 	if (rval) {
 		fprintf(stderr, "There was an error creating %s\n", dbname);
+		fprintf(stderr, " error message: %s\n", avro_strerror());
 		exit(EXIT_FAILURE);
 	}
 
@@ -159,6 +160,11 @@ int main(void)
 		add_person(db, "Bob", "Silent", "(555) 123-6422", 29);
 		add_person(db, "Jay", "???", number, 26);
 	}
+
+	/* Close the block and open a new one */
+	avro_file_writer_flush(db);
+	add_person(db, "Super", "Man", "123456", 31);
+
 	avro_file_writer_close(db);
 
 	fprintf(stdout, "\nNow let's read all the records back out\n");
@@ -170,7 +176,7 @@ int main(void)
 	}
 	for (i = 0; i < id; i++) {
 		if (print_person(dbreader, NULL)) {
-			fprintf(stderr, "Error printing person\n");
+			fprintf(stderr, "Error printing person\nMessage: %s\n", avro_strerror());
 			exit(EXIT_FAILURE);
 		}
 	}

Modified: avro/trunk/lang/c/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/CMakeLists.txt?rev=1221076&r1=1221075&r2=1221076&view=diff
==============================================================================
--- avro/trunk/lang/c/src/CMakeLists.txt (original)
+++ avro/trunk/lang/c/src/CMakeLists.txt Tue Dec 20 02:27:44 2011
@@ -35,6 +35,8 @@ set(AVRO_SRC
     avro/value.h
     avro_generic_internal.h
     avro_private.h
+    codec.c
+    codec.h
     consumer.c
     consume-binary.c
     datafile.c
@@ -100,8 +102,10 @@ source_group(Jansson FILES ${JANSSON_SRC
 string(REPLACE ":" "." LIBAVRO_DOT_VERSION ${LIBAVRO_VERSION})
 
 add_library(avro-static STATIC ${AVRO_SRC} ${JANSSON_SRC})
+target_link_libraries(avro-static ${CODEC_LIBRARIES})
 set_target_properties(avro-static PROPERTIES OUTPUT_NAME avro)
 add_library(avro-shared SHARED ${AVRO_SRC} ${JANSSON_SRC})
+target_link_libraries(avro-shared ${CODEC_LIBRARIES})
 set_target_properties(avro-shared PROPERTIES
         OUTPUT_NAME avro
         SOVERSION ${LIBAVRO_DOT_VERSION})
@@ -128,6 +132,7 @@ install(FILES ${CMAKE_CURRENT_BINARY_DIR
         DESTINATION lib/pkgconfig)
 
 add_executable(avrocat avrocat.c)
+
 target_link_libraries(avrocat avro-static)
 install(TARGETS avrocat RUNTIME DESTINATION bin)
 

Modified: avro/trunk/lang/c/src/avro-c.pc.in
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro-c.pc.in?rev=1221076&r1=1221075&r2=1221076&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro-c.pc.in (original)
+++ avro/trunk/lang/c/src/avro-c.pc.in Tue Dec 20 02:27:44 2011
@@ -4,3 +4,4 @@ Version: @VERSION@
 URL: http://avro.apache.org/
 Libs: -L@prefix@/lib -lavro
 Cflags: -I@prefix@/include
+Requires: @CODEC_PKG@

Modified: avro/trunk/lang/c/src/avro/io.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro/io.h?rev=1221076&r1=1221075&r2=1221076&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro/io.h (original)
+++ avro/trunk/lang/c/src/avro/io.h Tue Dec 20 02:27:44 2011
@@ -99,6 +99,9 @@ typedef struct avro_file_writer_t_ *avro
 
 int avro_file_writer_create(const char *path, avro_schema_t schema,
 			    avro_file_writer_t * writer);
+int avro_file_writer_create_with_codec(const char *path,
+				avro_schema_t schema, avro_file_writer_t * writer,
+				const char *codec, size_t block_size);
 int avro_file_writer_open(const char *path, avro_file_writer_t * writer);
 int avro_file_reader(const char *path, avro_file_reader_t * reader);
 int avro_file_reader_fp(FILE *fp, const char *path, int should_close,

Added: avro/trunk/lang/c/src/codec.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/codec.c?rev=1221076&view=auto
==============================================================================
--- avro/trunk/lang/c/src/codec.c (added)
+++ avro/trunk/lang/c/src/codec.c Tue Dec 20 02:27:44 2011
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License. 
+ */
+
+#include <string.h>
+#ifdef DEFLATE_CODEC
+#include <zlib.h>
+#endif
+#ifdef LZMA_CODEC
+#include <lzma.h>
+#endif
+#include "avro/errors.h"
+#include "avro/allocation.h"
+#include "codec.h"
+
+#define DEFAULT_BLOCK_SIZE	(16 * 1024)
+
+/* NULL codec */
+
+static int
+codec_null(avro_codec_t codec)
+{
+	codec->name = "null";
+	codec->type = AVRO_CODEC_NULL;
+	codec->block_size = 0;
+	codec->used_size = 0;
+	codec->block_data = NULL;
+	codec->codec_data = NULL;
+
+	return 0;
+}
+
+static int encode_null(avro_codec_t c, void * data, int64_t len)
+{
+	c->block_data = data;
+	c->block_size = len;
+	c->used_size = len;
+
+	return 0;
+}
+
+static int decode_null(avro_codec_t c, void * data, int64_t len)
+{
+	c->block_data = data;
+	c->block_size = len;
+	c->used_size = len;
+
+	return 0;
+}
+
+static int reset_null(avro_codec_t c)
+{
+	c->block_data = NULL;
+	c->block_size = 0;
+	c->used_size = 0;
+	c->codec_data = NULL;
+
+	return 0;
+}
+
+/* Deflate codec */
+
+#ifdef DEFLATE_CODEC
+
+struct codec_data_deflate {
+	z_stream deflate;
+	z_stream inflate;
+};
+#define codec_data_deflate_stream(cd)	&((struct codec_data_deflate *)cd)->deflate
+#define codec_data_inflate_stream(cd)	&((struct codec_data_deflate *)cd)->inflate
+
+
+static int
+codec_deflate(avro_codec_t codec)
+{
+	codec->name = "deflate";
+	codec->type = AVRO_CODEC_DEFLATE;
+	codec->block_size = 0;
+	codec->used_size = 0;
+	codec->block_data = NULL;
+	codec->codec_data = avro_new(struct codec_data_deflate);
+
+	if (!codec->codec_data) {
+		avro_set_error("Cannot allocate memory for zlib");
+		return 1;
+	}
+
+	z_stream *ds = codec_data_deflate_stream(codec->codec_data);
+	z_stream *is = codec_data_inflate_stream(codec->codec_data);
+
+	memset(ds, 0, sizeof(z_stream));
+	memset(is, 0, sizeof(z_stream));
+
+	ds->zalloc = is->zalloc = Z_NULL;
+	ds->zfree  = is->zfree  = Z_NULL;
+	ds->opaque = is->opaque = Z_NULL;
+
+	if (deflateInit2(ds, Z_BEST_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
+		avro_freet(struct codec_data_deflate, codec->codec_data);
+		avro_set_error("Cannot initialize zlib deflate");
+		return 1;
+	}
+
+	if (inflateInit2(is, -15) != Z_OK) {
+		avro_freet(struct codec_data_deflate, codec->codec_data);
+		avro_set_error("Cannot initialize zlib inflate");
+		return 1;
+	}
+
+	return 0;
+}
+
+static int encode_deflate(avro_codec_t c, void * data, int64_t len)
+{
+	int err;
+	int64_t defl_len = compressBound((uLong)len * 1.2);
+
+	if (!c->block_data) {
+		c->block_data = avro_malloc(defl_len);
+		c->block_size = defl_len;
+	} else if ( c->block_size < defl_len) {
+		c->block_data = avro_realloc(c->block_data, c->block_size, defl_len);
+		c->block_size = defl_len;
+	}
+
+	if (!c->block_data)
+	{
+		avro_set_error("Cannot allocate memory for deflate");
+		return 1;
+	}
+
+	c->used_size = 0;
+
+	z_stream *s = codec_data_deflate_stream(c->codec_data);
+
+	s->next_in = (Bytef*)data;
+	s->avail_in = (uInt)len;
+
+	s->next_out = c->block_data;
+	s->avail_out = (uInt)c->block_size;
+
+	s->total_out = 0;
+
+	err = deflate(s, Z_FINISH);
+	if (err != Z_STREAM_END) {
+		deflateEnd(s);
+		if (err != Z_OK) {
+			avro_set_error("Error compressing block with deflate (%i)", err);
+			return 1;
+		}
+		return 0;
+	}
+
+	// zlib resizes the buffer?
+	c->block_size = s->total_out;
+	c->used_size = s->total_out;
+
+	if (deflateReset(s) != Z_OK) {
+		return 1;
+	}
+
+	return 0;
+}
+
+static int decode_deflate(avro_codec_t c, void * data, int64_t len)
+{
+	int err;
+	z_stream *s = codec_data_inflate_stream(c->codec_data);
+
+	if (!c->block_data) {
+		c->block_data = avro_malloc(DEFAULT_BLOCK_SIZE);
+		c->block_size = DEFAULT_BLOCK_SIZE;
+	}
+
+	if (!c->block_data)
+	{
+		avro_set_error("Cannot allocate memory for deflate");
+		return 1;
+	}
+
+	c->used_size = 0;
+
+	s->next_in = data;
+	s->avail_in = len;
+
+	s->next_out = c->block_data;
+	s->avail_out = c->block_size;
+
+	s->total_out = 0;
+
+	do
+	{
+		err = inflate(s, Z_FINISH);
+
+		// The buffer was not big enough. resize it.
+		if (err == Z_BUF_ERROR)
+		{
+			c->block_data = avro_realloc(c->block_data, c->block_size, c->block_size * 2);
+			s->next_out = c->block_data + s->total_out;
+			s->avail_out += c->block_size;
+			c->block_size = c->block_size * 2;
+		}
+	} while (err == Z_BUF_ERROR);
+
+	if (err != Z_STREAM_END) {
+		inflateEnd(s);
+		if (err != Z_OK) {
+			avro_set_error("Error decompressing block with deflate (%i)", err);
+			return 1;
+		}
+		return 0;
+	}
+
+	c->used_size = s->total_out;
+
+	if (inflateReset(s) != Z_OK) {
+		return 1;
+	}
+
+	return 0;
+}
+
+static int reset_deflate(avro_codec_t c)
+{
+	if (c->block_data) {
+		avro_free(c->block_data, c->block_size);
+	}
+	if (c->codec_data) {
+		deflateEnd(codec_data_deflate_stream(c->codec_data));
+		inflateEnd(codec_data_inflate_stream(c->codec_data));
+		avro_freet(struct codec_data_deflate, c->codec_data);
+	}
+
+	c->block_data = NULL;
+	c->block_size = 0;
+	c->used_size = 0;
+	c->codec_data = NULL;
+
+	return 0;
+}
+
+#endif // DEFLATE_CODEC
+
+/* LZMA codec */
+
+#ifdef LZMA_CODEC
+
+struct codec_data_lzma {
+	lzma_filter filters[2];
+	lzma_options_lzma options;
+};
+#define codec_data_lzma_filters(cd)	((struct codec_data_lzma *)cd)->filters
+#define codec_data_lzma_options(cd)	&((struct codec_data_lzma *)cd)->options
+
+static int
+codec_lzma(avro_codec_t codec)
+{
+	codec->name = "lzma";
+	codec->type = AVRO_CODEC_LZMA;
+	codec->block_size = 0;
+	codec->used_size = 0;
+	codec->block_data = NULL;
+	codec->codec_data = avro_new(struct codec_data_lzma);
+
+	if (!codec->codec_data) {
+		avro_set_error("Cannot allocate memory for lzma");
+		return 1;
+	}
+
+	lzma_options_lzma* opt = codec_data_lzma_options(codec->codec_data);
+	lzma_lzma_preset(opt, LZMA_PRESET_DEFAULT);
+
+	lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
+	filters[0].id = LZMA_FILTER_LZMA2;
+	filters[0].options = opt;
+	filters[1].id = LZMA_VLI_UNKNOWN;
+	filters[1].options = NULL;
+
+	return 0;
+}
+
+static int encode_lzma(avro_codec_t codec, void * data, int64_t len)
+{
+	lzma_ret ret;
+	size_t written = 0;
+	lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
+
+	int64_t buff_len = len + lzma_raw_encoder_memusage(filters);
+
+	if (!codec->block_data) {
+		codec->block_data = avro_malloc(buff_len);
+		codec->block_size = buff_len;
+	}
+
+	if (!codec->block_data)
+	{
+		avro_set_error("Cannot allocate memory for lzma encoder");
+		return 1;
+	}
+
+	ret = lzma_raw_buffer_encode(filters, NULL, data, len, codec->block_data, &written, codec->block_size);
+
+	codec->used_size = written;
+
+	if (ret != LZMA_OK) {
+		avro_set_error("Error in lzma encoder");
+		return 1;
+	}
+
+	return 0;
+}
+
+static int decode_lzma(avro_codec_t codec, void * data, int64_t len)
+{
+	size_t read_pos = 0;
+	size_t write_pos = 0;
+	lzma_ret ret;
+	lzma_filter* filters = codec_data_lzma_filters(codec->codec_data);
+
+	if (!codec->block_data) {
+		codec->block_data = avro_malloc(DEFAULT_BLOCK_SIZE);
+		codec->block_size = DEFAULT_BLOCK_SIZE;
+	}
+
+	if (!codec->block_data) {
+		avro_set_error("Cannot allocate memory for lzma decoder");
+		return 1;
+	}
+
+	do
+	{
+		ret = lzma_raw_buffer_decode(filters, NULL, data,
+			&read_pos, len, codec->block_data, &write_pos,
+			codec->block_size);
+
+		codec->used_size = write_pos;
+
+		// If it ran out of space to decode, give it more!!
+		// It will continue where it left off because of read_pos and write_pos.
+		if (ret == LZMA_BUF_ERROR) {
+			codec->block_data = avro_realloc(codec->block_data, codec->block_size, codec->block_size * 2);
+			codec->block_size = codec->block_size * 2;
+		}
+
+	} while (ret == LZMA_BUF_ERROR);
+
+	if (ret != LZMA_OK) {
+		avro_set_error("Error in lzma decoder");
+		return 1;
+	}
+
+	return 0;
+}
+
+static int reset_lzma(avro_codec_t c)
+{
+	if (c->block_data) {
+		avro_free(c->block_data, c->block_size);
+	}
+	if (c->codec_data) {
+		avro_freet(struct codec_data_lzma, c->codec_data);
+	}
+
+	c->block_data = NULL;
+	c->block_size = 0;
+	c->used_size = 0;
+	c->codec_data = NULL;
+
+	return 0;
+}
+
+#endif // LZMA_CODEC
+
+/* Common interface */
+
+int avro_codec(avro_codec_t codec, const char *type)
+{
+#ifdef DEFLATE_CODEC
+	if (strcmp("deflate", type) == 0) {
+		return codec_deflate(codec);
+	}
+#endif
+
+#ifdef LZMA_CODEC
+	if (strcmp("lzma", type) == 0) {
+		return codec_lzma(codec);
+	}
+#endif
+
+	if (strcmp("null", type) == 0) {
+		return codec_null(codec);
+	}
+
+	avro_set_error("Unknown codec %s", type);
+	return 1;
+}
+
+int avro_codec_encode(avro_codec_t c, void * data, int64_t len)
+{
+	switch(c->type)
+	{
+	case AVRO_CODEC_NULL:
+		return encode_null(c, data, len);
+#ifdef DEFLATE_CODEC
+	case AVRO_CODEC_DEFLATE:
+		return encode_deflate(c, data, len);
+#endif
+#ifdef LZMA_CODEC
+	case AVRO_CODEC_LZMA:
+		return encode_lzma(c, data, len);
+#endif
+	default:
+		return 1;
+	}
+}
+
+int avro_codec_decode(avro_codec_t c, void * data, int64_t len)
+{
+	switch(c->type)
+	{
+	case AVRO_CODEC_NULL:
+		return decode_null(c, data, len);
+#ifdef DEFLATE_CODEC
+	case AVRO_CODEC_DEFLATE:
+		return decode_deflate(c, data, len);
+#endif
+#ifdef LZMA_CODEC
+	case AVRO_CODEC_LZMA:
+		return decode_lzma(c, data, len);
+#endif
+	default:
+		return 1;
+	}
+}
+
+int avro_codec_reset(avro_codec_t c)
+{
+	switch(c->type)
+	{
+	case AVRO_CODEC_NULL:
+		return reset_null(c);
+#ifdef DEFLATE_CODEC
+	case AVRO_CODEC_DEFLATE:
+		return reset_deflate(c);
+#endif
+#ifdef LZMA_CODEC
+	case AVRO_CODEC_LZMA:
+		return reset_lzma(c);
+#endif
+	default:
+		return 1;
+	}
+}

Added: avro/trunk/lang/c/src/codec.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/codec.h?rev=1221076&view=auto
==============================================================================
--- avro/trunk/lang/c/src/codec.h (added)
+++ avro/trunk/lang/c/src/codec.h Tue Dec 20 02:27:44 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License. 
+ */
+
+#ifndef AVRO_CODEC_H
+#define	AVRO_CODEC_H
+
+enum avro_codec_type_t {
+	AVRO_CODEC_NULL,
+	AVRO_CODEC_DEFLATE,
+	AVRO_CODEC_LZMA
+};
+typedef enum avro_codec_type_t avro_codec_type_t;
+
+struct avro_codec_t_ {
+	const char * name;
+	avro_codec_type_t type;
+	int64_t block_size;
+	int64_t used_size;
+	void * block_data;
+	void * codec_data;
+};
+typedef struct avro_codec_t_* avro_codec_t;
+
+int avro_codec(avro_codec_t c, const char *type);
+int avro_codec_reset(avro_codec_t c);
+int avro_codec_encode(avro_codec_t c, void * data, int64_t len);
+int avro_codec_decode(avro_codec_t c, void * data, int64_t len);
+
+#endif

Modified: avro/trunk/lang/c/src/datafile.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/datafile.c?rev=1221076&r1=1221075&r2=1221076&view=diff
==============================================================================
--- avro/trunk/lang/c/src/datafile.c (original)
+++ avro/trunk/lang/c/src/datafile.c Tue Dec 20 02:27:44 2011
@@ -21,6 +21,7 @@
 #include "avro/errors.h"
 #include "avro/value.h"
 #include "encoding.h"
+#include "codec.h"
 #include <stdio.h>
 #include <stdlib.h>
 #include <errno.h>
@@ -31,22 +32,29 @@
 struct avro_file_reader_t_ {
 	avro_schema_t writers_schema;
 	avro_reader_t reader;
+	avro_reader_t block_reader;
+	avro_codec_t codec;
 	char sync[16];
 	int64_t blocks_read;
 	int64_t blocks_total;
 	int64_t current_blocklen;
+	char * current_blockdata;
 };
 
 struct avro_file_writer_t_ {
 	avro_schema_t writers_schema;
 	avro_writer_t writer;
+	avro_codec_t codec;
 	char sync[16];
 	int block_count;
 	size_t block_size;
 	avro_writer_t datum_writer;
-	char datum_buffer[16 * 1024];
+	char* datum_buffer;
+	size_t datum_buffer_size;
 };
 
+#define DEFAULT_BLOCK_SIZE 16 * 1024
+
 /* TODO: should we just read /dev/random? */
 static void generate_sync(avro_file_writer_t w)
 {
@@ -82,7 +90,7 @@ static int write_header(avro_file_writer
 	check(rval, enc->write_string(w->writer, "avro.sync"));
 	check(rval, enc->write_bytes(w->writer, w->sync, sizeof(w->sync)));
 	check(rval, enc->write_string(w->writer, "avro.codec"));
-	check(rval, enc->write_bytes(w->writer, "null", 4));
+	check(rval, enc->write_bytes(w->writer, w->codec->name, strlen(w->codec->name)));
 	check(rval, enc->write_string(w->writer, "avro.schema"));
 	schema_writer = avro_writer_memory(schema_buf, sizeof(schema_buf));
 	rval = avro_schema_to_json(w->writers_schema, schema_writer);
@@ -115,7 +123,7 @@ file_writer_init_fp(const char *path, co
 }
 
 static int
-file_writer_create(const char *path, avro_schema_t schema, avro_file_writer_t w)
+file_writer_create(const char *path, avro_schema_t schema, avro_file_writer_t w, size_t block_size)
 {
 	int rval;
 	w->block_count = 0;
@@ -124,11 +132,21 @@ file_writer_create(const char *path, avr
 		check(rval, file_writer_init_fp(path, "w", w));
 	}
 
+	w->datum_buffer_size = block_size;
+	w->datum_buffer = avro_malloc(w->datum_buffer_size);
+
+	if(!w->datum_buffer) {
+		avro_set_error("Could not allocate datum buffer\n");
+		avro_writer_free(w->writer);
+		return ENOMEM;
+	}
+
 	w->datum_writer =
-	    avro_writer_memory(w->datum_buffer, sizeof(w->datum_buffer));
+	    avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
 	if (!w->datum_writer) {
 		avro_set_error("Cannot create datum writer for file %s", path);
 		avro_writer_free(w->writer);
+		avro_free(w->datum_buffer, w->datum_buffer_size);
 		return ENOMEM;
 	}
 
@@ -140,29 +158,52 @@ int
 avro_file_writer_create(const char *path, avro_schema_t schema,
 			avro_file_writer_t * writer)
 {
+	return avro_file_writer_create_with_codec(path, schema, writer, "null", 0);
+}
+
+int avro_file_writer_create_with_codec(const char *path,
+			avro_schema_t schema, avro_file_writer_t * writer,
+			const char *codec, size_t block_size)
+{
 	avro_file_writer_t w;
 	int rval;
 	check_param(EINVAL, path, "path");
 	check_param(EINVAL, is_avro_schema(schema), "schema");
 	check_param(EINVAL, writer, "writer");
+	check_param(EINVAL, codec, "codec");
+
+	if (block_size == 0) {
+		block_size = DEFAULT_BLOCK_SIZE;
+	}
 
 	w = avro_new(struct avro_file_writer_t_);
 	if (!w) {
 		avro_set_error("Cannot allocate new file writer");
 		return ENOMEM;
 	}
-	rval = file_writer_create(path, schema, w);
+	w->codec = avro_new(struct avro_codec_t_);
+	if (!w->codec) {
+		avro_set_error("Cannot allocate new codec");
+		return ENOMEM;
+	}
+	rval = avro_codec(w->codec, codec);
+	if (rval) {
+		avro_freet(struct avro_codec_t_, w->codec);
+		return rval;
+	}
+	rval = file_writer_create(path, schema, w, block_size);
 	if (rval) {
 		avro_freet(struct avro_file_writer_t_, w);
 		return rval;
 	}
 	*writer = w;
+
 	return 0;
 }
 
 static int file_read_header(avro_reader_t reader,
-			    avro_schema_t * writers_schema, char *sync,
-			    int synclen)
+			    avro_schema_t * writers_schema, avro_codec_t codec,
+			    char *sync, int synclen)
 {
 	int rval;
 	avro_schema_t meta_schema;
@@ -170,6 +211,7 @@ static int file_read_header(avro_reader_
 	avro_value_iface_t *meta_iface;
 	avro_value_t meta;
 	char magic[4];
+	avro_value_t codec_val;
 	avro_value_t schema_bytes;
 	const void *p;
 	size_t len;
@@ -196,6 +238,35 @@ static int file_read_header(avro_reader_
 	}
 	avro_schema_decref(meta_schema);
 
+	rval = avro_value_get_by_name(&meta, "avro.codec", &codec_val, NULL);
+	if (rval) {
+		avro_set_error("File header doesn't contain a codec");
+		avro_value_decref(&meta);
+		return rval;
+	} else {
+		const void *buf;
+		size_t size;
+		char codec_name[11];
+
+		avro_type_t type = avro_value_get_type(&codec_val);
+
+		if (type != AVRO_BYTES) {
+			avro_set_error("Value type of codec is unexpected");
+			avro_value_decref(&meta);
+			return EILSEQ;
+		}
+
+		avro_value_get_bytes(&codec_val, &buf, &size);
+		memset(codec_name, 0, sizeof(codec_name));
+		strncpy(codec_name, buf, size < 10 ? size : 10);
+
+		if (avro_codec(codec, codec_name) != 0) {
+			avro_set_error("File header contains an unknown codec");
+			avro_value_decref(&meta);
+			return EILSEQ;
+		}
+	}
+
 	rval = avro_value_get_by_name(&meta, "avro.schema", &schema_bytes, NULL);
 	if (rval) {
 		avro_set_error("File header doesn't contain a schema");
@@ -235,7 +306,7 @@ static int file_writer_open(const char *
 		return ENOMEM;
 	}
 	rval =
-	    file_read_header(reader, &w->writers_schema, w->sync,
+	    file_read_header(reader, &w->writers_schema, w->codec, w->sync,
 			     sizeof(w->sync));
 	avro_reader_free(reader);
 	/* Position to end of file and get ready to write */
@@ -271,11 +342,28 @@ int avro_file_writer_open(const char *pa
 static int file_read_block_count(avro_file_reader_t r)
 {
 	int rval;
+	int64_t len;
 	const avro_encoding_t *enc = &avro_binary_encoding;
 	check_prefix(rval, enc->read_long(r->reader, &r->blocks_total),
 		     "Cannot read file block count: ");
-	check_prefix(rval, enc->read_long(r->reader, &r->current_blocklen),
+	check_prefix(rval, enc->read_long(r->reader, &len),
 		     "Cannot read file block size: ");
+
+	if (r->current_blockdata && len > r->current_blocklen) {
+		r->current_blockdata = avro_realloc(r->current_blockdata, r->current_blocklen, len);
+		r->current_blocklen = len;
+	} else if (!r->current_blockdata) {
+		r->current_blockdata = avro_malloc(len);
+		r->current_blocklen = len;
+	}
+
+	check_prefix(rval, avro_read(r->reader, r->current_blockdata, len),
+		     "Cannot read file block: ");
+
+	avro_codec_decode(r->codec, r->current_blockdata, len);
+
+	avro_reader_memory_set_source(r->block_reader, r->codec->block_data, r->codec->used_size);
+
 	r->blocks_read = 0;
 	return 0;
 }
@@ -302,15 +390,31 @@ int avro_file_reader_fp(FILE *fp, const 
 		avro_freet(struct avro_file_reader_t_, r);
 		return ENOMEM;
 	}
+	r->block_reader = avro_reader_memory(0, 0);
+	if (!r->block_reader) {
+		avro_set_error("Cannot allocate block reader for file %s", path);
+		avro_freet(struct avro_file_reader_t_, r);
+		return ENOMEM;
+	}
+
+	r->codec = avro_new(struct avro_codec_t_);
+	if (!r->codec) {
+		avro_set_error("Could not allocate codec for file %s", path);
+		avro_freet(struct avro_file_reader_t_, r);
+		return ENOMEM;
+	}
 
-	rval = file_read_header(r->reader, &r->writers_schema, r->sync,
-				sizeof(r->sync));
+	rval = file_read_header(r->reader, &r->writers_schema, r->codec,
+				r->sync, sizeof(r->sync));
 	if (rval) {
 		avro_reader_free(r->reader);
 		avro_freet(struct avro_file_reader_t_, r);
 		return rval;
 	}
 
+	r->current_blockdata = NULL;
+	r->current_blocklen = 0;
+
 	rval = file_read_block_count(r);
 	if (rval) {
 		avro_reader_free(r->reader);
@@ -350,11 +454,14 @@ static int file_write_block(avro_file_wr
 		/* Write the block count */
 		check_prefix(rval, enc->write_long(w->writer, w->block_count),
 			     "Cannot write file block count: ");
+		/* Encode the block */
+		check_prefix(rval, avro_codec_encode(w->codec, w->datum_buffer, w->block_size),
+			     "Cannot encode file block: ");
 		/* Write the block length */
-		check_prefix(rval, enc->write_long(w->writer, w->block_size),
+		check_prefix(rval, enc->write_long(w->writer, w->codec->used_size),
 			     "Cannot write file block size: ");
 		/* Write the block */
-		check_prefix(rval, avro_write(w->writer, w->datum_buffer, w->block_size),
+		check_prefix(rval, avro_write(w->writer, w->codec->block_data, w->codec->used_size),
 			     "Cannot write file block: ");
 		/* Write the sync marker */
 		check_prefix(rval, write_sync(w),
@@ -432,6 +539,9 @@ int avro_file_writer_close(avro_file_wri
 	check(rval, avro_file_writer_flush(w));
 	avro_writer_free(w->datum_writer);
 	avro_writer_free(w->writer);
+	avro_free(w->datum_buffer, w->datum_buffer_size);
+	avro_codec_reset(w->codec);
+	avro_freet(struct avro_codec_t_, w->codec);
 	avro_freet(struct avro_file_writer_t_, w);
 	return 0;
 }
@@ -446,7 +556,7 @@ int avro_file_reader_read(avro_file_read
 	check_param(EINVAL, datum, "datum");
 
 	check(rval,
-	      avro_read_data(r->reader, r->writers_schema, readers_schema,
+	      avro_read_data(r->block_reader, r->writers_schema, readers_schema,
 			     datum));
 	r->blocks_read++;
 
@@ -472,7 +582,7 @@ avro_file_reader_read_value(avro_file_re
 	check_param(EINVAL, r, "reader");
 	check_param(EINVAL, value, "value");
 
-	check(rval, avro_value_read(r->reader, value));
+	check(rval, avro_value_read(r->block_reader, value));
 	r->blocks_read++;
 
 	if (r->blocks_read == r->blocks_total) {
@@ -492,6 +602,12 @@ int avro_file_reader_close(avro_file_rea
 {
 	avro_schema_decref(reader->writers_schema);
 	avro_reader_free(reader->reader);
+	avro_reader_free(reader->block_reader);
+	avro_codec_reset(reader->codec);
+	avro_freet(struct avro_codec_t_, reader->codec);
+	if (reader->current_blockdata) {
+		avro_free(reader->current_blockdata, reader->current_blocklen);
+	}
 	avro_freet(struct avro_file_reader_t_, reader);
 	return 0;
 }