You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2011/06/20 09:02:29 UTC
svn commit: r1137526 - in /avro/trunk: CHANGES.txt lang/py/build.xml
lang/py/scripts/ lang/py/scripts/avro lang/py/setup.py
lang/py/test/test_script.py
Author: philz
Date: Mon Jun 20 07:02:28 2011
New Revision: 1137526
URL: http://svn.apache.org/viewvc?rev=1137526&view=rev
Log:
AVRO-836. Python "avro" commandline utility to display and write Avro files.
Contributed by Miki Tebeka.
Added:
avro/trunk/lang/py/scripts/
avro/trunk/lang/py/scripts/avro
avro/trunk/lang/py/test/test_script.py
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/py/build.xml
avro/trunk/lang/py/setup.py
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1137526&r1=1137525&r2=1137526&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Jun 20 07:02:28 2011
@@ -8,6 +8,9 @@ Avro 1.6.0 (unreleased)
IMPROVEMENTS
+ AVRO-836. Python "avro" commandline utility to display and write Avro files.
+ (Miki Tebeka via philz)
+
AVRO-833. Don't require simplejson for python >= 2.6.
(Miki Tebeka via philz)
Modified: avro/trunk/lang/py/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/build.xml?rev=1137526&r1=1137525&r2=1137526&view=diff
==============================================================================
--- avro/trunk/lang/py/build.xml (original)
+++ avro/trunk/lang/py/build.xml Mon Jun 20 07:02:28 2011
@@ -98,6 +98,20 @@
</filterset>
</copy>
+ <!-- Inline the Avro version -->
+ <copy file="${basedir}/scripts/avro"
+ toFile="${build.dir}/scripts/avro"
+ overwrite="true">
+ <filterset>
+ <filter token="AVRO_VERSION" value="${avro.version}"/>
+ </filterset>
+ </copy>
+ <!-- Make executable (Ant does not preseve executable bit) -->
+ <exec executable="chmod">
+ <arg value="a+x" />
+ <arg value="${build.dir}/scripts/avro" />
+ </exec>
+
<!-- Inline the interop data directory -->
<copy file="${test.dir}/test_datafile_interop.py"
toFile="${build.dir}/test/test_datafile_interop.py"
Added: avro/trunk/lang/py/scripts/avro
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/scripts/avro?rev=1137526&view=auto
==============================================================================
--- avro/trunk/lang/py/scripts/avro (added)
+++ avro/trunk/lang/py/scripts/avro Mon Jun 20 07:02:28 2011
@@ -0,0 +1,243 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Command line utlity for reading and writing Avro files."""
+
+from avro.io import DatumReader, DatumWriter
+from avro.datafile import DataFileReader, DataFileWriter
+import avro.schema
+
+try:
+ import json
+except ImportError:
+ import simplejson as json
+import csv
+from sys import stdout, stdin
+from itertools import ifilter, imap
+from functools import partial
+from os.path import splitext
+
+class AvroError(Exception):
+ pass
+
+def print_json(row):
+ print(json.dumps(row))
+
+def print_json_pretty(row):
+ print(json.dumps(row, indent=4))
+
+_write_row = csv.writer(stdout).writerow
+_encoding = stdout.encoding or "UTF-8"
+def _encode(v, encoding=_encoding):
+ if not isinstance(v, basestring):
+ return v
+ return v.encode(_encoding)
+
+def print_csv(row):
+ # We sort the keys to the fields will be in the same place
+ # FIXME: Do we want to do it in schema order?
+ _write_row([_encode(row[key]) for key in sorted(row)])
+
+def select_printer(format):
+ return {
+ "json" : print_json,
+ "json-pretty" : print_json_pretty,
+ "csv" : print_csv
+ }[format]
+
+def record_match(expr, record):
+ return eval(expr, None, {"r" : record})
+
+def print_avro(avro, opts):
+ if opts.header and (opts.format != "csv"):
+ raise AvroError("--header applies only to CSV format")
+
+ # Apply filter first
+ if opts.filter:
+ avro = ifilter(partial(record_match, opts.filter), avro)
+
+ for i in xrange(opts.skip):
+ try:
+ next(avro)
+ except StopIteration:
+ return
+
+ printer = select_printer(opts.format)
+ for i, record in enumerate(avro):
+ if i == 0 and opts.header:
+ _write_row(sorted(record.keys()))
+ if i >= opts.count:
+ break
+ printer(record)
+
+def print_schema(avro):
+ schema = avro.meta["avro.schema"]
+ # Pretty print
+ print json.dumps(json.loads(schema), indent=4)
+
+def cat(opts, args):
+ if not args:
+ raise AvroError("No files to show")
+
+ for filename in args:
+ try:
+ fo = open(filename, "rb")
+ except (OSError, IOError), e:
+ raise AvroError("Can't open %s - %s" % (filename, e))
+
+ avro = DataFileReader(fo, DatumReader())
+
+ if opts.print_schema:
+ print_schema(avro)
+ continue
+
+ print_avro(avro, opts)
+
+def _open(filename, mode):
+ if filename == "-":
+ return {
+ "rb" : stdin,
+ "wb" : stdout
+ }[mode]
+
+ return open(filename, mode)
+
+def iter_json(info, _):
+ return imap(json.loads, info)
+
+def convert(value, field):
+ type = field.type.type
+ if type == "union":
+ return convert_union(value, field)
+
+ return {
+ "int" : int,
+ "long" : long,
+ "float" : float,
+ "double" : float,
+ "string" : str,
+ "bytes" : str,
+ "boolean" : bool,
+ "null" : lambda _: None,
+ "union" : lambda v: convert_union(v, field),
+ }[type](value)
+
+def convert_union(value, field):
+ for name in [s.name for s in field.type.schemas]:
+ try:
+ return convert(name)(value)
+ except ValueError:
+ continue
+
+def iter_csv(info, schema):
+ header = [field.name for field in schema.fields]
+ for row in csv.reader(info):
+ values = [convert(v, f) for v, f in zip(row, schema.fields)]
+ yield dict(zip(header, values))
+
+def guess_input_type(files):
+ if not files:
+ return None
+
+ ext = splitext(files[0])[1].lower()
+ if ext in (".json", ".js"):
+ return "json"
+ elif ext in (".csv",):
+ return "csv"
+
+ return None
+
+def write(opts, files):
+ if not opts.schema:
+ raise AvroError("No schema specified")
+
+ input_type = opts.input_type or guess_input_type(files)
+ if not input_type:
+ raise AvroError("Can't guess input file type (not .json or .csv)")
+
+ try:
+ schema = avro.schema.parse(open(opts.schema, "rb").read())
+ out = _open(opts.output, "wb")
+ except (IOError, OSError), e:
+ raise AvroError("Can't open file - %s" % e)
+
+ writer = DataFileWriter(out, DatumWriter(), schema)
+
+ iter_records = {"json" : iter_json, "csv" : iter_csv}[input_type]
+ for filename in (files or ["-"]):
+ info = _open(filename, "rb")
+ for record in iter_records(info, schema):
+ writer.append(record)
+
+ writer.close()
+
+def main(argv=None):
+ import sys
+ from optparse import OptionParser, OptionGroup
+
+ argv = argv or sys.argv
+
+ parser = OptionParser(description="Display/write for Avro files",
+ version="@AVRO_VERSION@",
+ usage="usage: %prog cat|write [options] FILE [FILE...]")
+ # cat options
+
+ cat_options = OptionGroup(parser, "cat options")
+ cat_options.add_option("-n", "--count", default=float("Infinity"),
+ help="number of records to print", type=int)
+ cat_options.add_option("-s", "--skip", help="number of records to skip",
+ type=int, default=0)
+ cat_options.add_option("-f", "--format", help="record format",
+ default="json",
+ choices=["json", "csv", "json-pretty"])
+ cat_options.add_option("--header", help="print CSV header", default=False,
+ action="store_true")
+ cat_options.add_option("--filter", help="filter records (e.g. r['age']>1)",
+ default=None)
+ cat_options.add_option("--print-schema", help="print schema",
+ action="store_true", default=False)
+ parser.add_option_group(cat_options)
+
+ # write options
+ write_options = OptionGroup(parser, "write options")
+ write_options.add_option("--schema", help="schema file (required)")
+ write_options.add_option("--input-type",
+ help="input file(s) type (json or csv)",
+ choices=["json", "csv"], default=None)
+ write_options.add_option("-o", "--output", help="output file", default="-")
+ parser.add_option_group(write_options)
+
+ opts, args = parser.parse_args(argv[1:])
+ if len(args) < 1:
+ parser.error("You much specify `cat` or `write`") # Will exit
+
+ command = args.pop(0)
+ try:
+ if command == "cat":
+ cat(opts, args)
+ elif command == "write":
+ write(opts, args)
+ else:
+ raise AvroError("Unknown command - %s" % command)
+ except AvroError, e:
+ parser.error("%s" % e) # Will exit
+ except Exception, e:
+ raise SystemExit("panic: %s" % e)
+
+if __name__ == "__main__":
+ main()
+
Modified: avro/trunk/lang/py/setup.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/setup.py?rev=1137526&r1=1137525&r2=1137526&view=diff
==============================================================================
--- avro/trunk/lang/py/setup.py (original)
+++ avro/trunk/lang/py/setup.py Mon Jun 20 07:02:28 2011
@@ -31,6 +31,7 @@ setup(
version = '@AVRO_VERSION@',
packages = ['avro',],
package_dir = {'avro': 'src/avro'},
+ scripts = ["./scripts/avro"],
# Project uses simplejson, so ensure that it gets installed or upgraded
# on the target machine
Added: avro/trunk/lang/py/test/test_script.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_script.py?rev=1137526&view=auto
==============================================================================
--- avro/trunk/lang/py/test/test_script.py (added)
+++ avro/trunk/lang/py/test/test_script.py Mon Jun 20 07:02:28 2011
@@ -0,0 +1,238 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+import csv
+from cStringIO import StringIO
+try:
+ import json
+except ImportError:
+ import simplejson as json
+from tempfile import NamedTemporaryFile
+import avro.schema
+from avro.io import DatumWriter
+from avro.datafile import DataFileWriter
+from os.path import dirname, join, isfile
+from os import remove
+from operator import itemgetter
+
+NUM_RECORDS = 7
+
+try:
+ from subprocess import check_output
+except ImportError:
+ from subprocess import Popen, PIPE
+
+ def check_output(args):
+ pipe = Popen(args, stdout=PIPE)
+ if pipe.wait() != 0:
+ raise ValueError
+ return pipe.stdout.read()
+
+try:
+ from subprocess import check_call
+except ImportError:
+ def check_call(args, **kw):
+ pipe = Popen(args, **kw)
+ assert pipe.wait() == 0
+
+SCHEMA = '''
+{
+ "namespace": "test.avro",
+ "name": "LooneyTunes",
+ "type": "record",
+ "fields": [
+ {"name": "first", "type": "string"},
+ {"name": "last", "type": "string"},
+ {"name": "type", "type": "string"}
+ ]
+}
+'''
+
+LOONIES = (
+ ("daffy", "duck", "duck"),
+ ("bugs", "bunny", "bunny"),
+ ("tweety", "", "bird"),
+ ("road", "runner", "bird"),
+ ("wile", "e", "coyote"),
+ ("pepe", "le pew", "skunk"),
+ ("foghorn", "leghorn", "rooster"),
+)
+
+def looney_records():
+ for f, l, t in LOONIES:
+ yield {"first": f, "last" : l, "type" : t}
+
+SCRIPT = join(dirname(__file__), "..", "scripts", "avro")
+
+_JSON_PRETTY = '''{
+ "type": "duck",
+ "last": "duck",
+ "first": "daffy"
+}'''
+
+def gen_avro(filename):
+ schema = avro.schema.parse(SCHEMA)
+ fo = open(filename, "wb")
+ writer = DataFileWriter(fo, DatumWriter(), schema)
+ for record in looney_records():
+ writer.append(record)
+ writer.close()
+ fo.close()
+
+def tempfile():
+ return NamedTemporaryFile(delete=False).name
+
+class TestCat(unittest.TestCase):
+ def setUp(self):
+ self.avro_file = tempfile()
+ gen_avro(self.avro_file)
+
+ def tearDown(self):
+ if isfile(self.avro_file):
+ remove(self.avro_file)
+
+ def _run(self, *args, **kw):
+ out = check_output([SCRIPT, "cat", self.avro_file] + list(args))
+ if kw.get("raw"):
+ return out
+ else:
+ return out.splitlines()
+
+ def test_print(self):
+ return len(self._run()) == NUM_RECORDS
+
+ def test_filter(self):
+ return len(self._run("--filter", "r['type']=='bird'")) == 2
+
+ def test_skip(self):
+ skip = 3
+ return len(self._run("--skip", str(skip))) == NUM_RECORDS - skip
+
+ def test_csv(self):
+ reader = csv.reader(StringIO(self._run("-f", "csv", raw=True)))
+ assert len(list(reader)) == NUM_RECORDS
+
+ def test_csv_header(self):
+ io = StringIO(self._run("-f", "csv", "--header", raw=True))
+ reader = csv.DictReader(io)
+ r = {"type": "duck", "last": "duck", "first": "daffy"}
+ assert next(reader) == r
+
+ def test_print_schema(self):
+ out = self._run("--print-schema", raw=True)
+ assert json.loads(out)["namespace"] == "test.avro"
+
+ def test_help(self):
+ # Just see we have these
+ self._run("-h")
+ self._run("--help")
+
+ def test_json_pretty(self):
+ out = self._run("--format", "json-pretty", "-n", "1", raw=1)
+ assert out.strip() == _JSON_PRETTY.strip()
+
+ def test_version(self):
+ check_output([SCRIPT, "cat", "--version"])
+
+ def test_files(self):
+ out = self._run(self.avro_file)
+ assert len(out) == 2 * NUM_RECORDS
+
+class TestWrite(unittest.TestCase):
+ def setUp(self):
+ self.json_file = tempfile() + ".json"
+ fo = open(self.json_file, "w")
+ for record in looney_records():
+ json.dump(record, fo)
+ fo.write("\n")
+ fo.close()
+
+ self.csv_file = tempfile() + ".csv"
+ fo = open(self.csv_file, "w")
+ write = csv.writer(fo).writerow
+ get = itemgetter("first", "last", "type")
+ for record in looney_records():
+ write(get(record))
+ fo.close()
+
+ self.schema_file = tempfile()
+ fo = open(self.schema_file, "w")
+ fo.write(SCHEMA)
+ fo.close()
+
+ def tearDown(self):
+ for filename in (self.csv_file, self.json_file, self.schema_file):
+ try:
+ remove(filename)
+ except OSError:
+ continue
+
+ def _run(self, *args, **kw):
+ args = [SCRIPT, "write", "--schema", self.schema_file] + list(args)
+ check_call(args, **kw)
+
+ def load_avro(self, filename):
+ out = check_output([SCRIPT, "cat", filename])
+ return map(json.loads, out.splitlines())
+
+ def test_version(self):
+ check_call([SCRIPT, "write", "--version"])
+
+ def format_check(self, format, filename):
+ tmp = tempfile()
+ fo = open(tmp, "wb")
+ self._run(filename, "-f", format, stdout=fo)
+ fo.close()
+
+ records = self.load_avro(tmp)
+ assert len(records) == NUM_RECORDS
+ assert records[0]["first"] == "daffy"
+
+ remove(tmp)
+
+ def test_write_json(self):
+ self.format_check("json", self.json_file)
+
+ def test_write_csv(self):
+ self.format_check("csv", self.csv_file)
+
+ def test_outfile(self):
+ tmp = tempfile()
+ remove(tmp)
+ self._run(self.json_file, "-o", tmp)
+
+ assert len(self.load_avro(tmp)) == NUM_RECORDS
+ remove(tmp)
+
+ def test_multi_file(self):
+ tmp = tempfile()
+ fo = open(tmp, "wb")
+ self._run(self.json_file, self.json_file, stdout=fo)
+ fo.close()
+
+ assert len(self.load_avro(tmp)) == 2 * NUM_RECORDS
+ remove(tmp)
+
+ def test_stdin(self):
+ tmp = tempfile()
+
+ info = open(self.json_file, "rb")
+ fo = open(tmp, "wb")
+ self._run("--input-type", "json", stdin=info, stdout=fo)
+ fo.close()
+
+ assert len(self.load_avro(tmp)) == NUM_RECORDS
+ remove(tmp)