You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ph...@apache.org on 2011/06/20 09:02:29 UTC

svn commit: r1137526 - in /avro/trunk: CHANGES.txt lang/py/build.xml lang/py/scripts/ lang/py/scripts/avro lang/py/setup.py lang/py/test/test_script.py

Author: philz
Date: Mon Jun 20 07:02:28 2011
New Revision: 1137526

URL: http://svn.apache.org/viewvc?rev=1137526&view=rev
Log:
AVRO-836. Python "avro" commandline utility to display and write Avro files.
Contributed by Miki Tebeka.


Added:
    avro/trunk/lang/py/scripts/
    avro/trunk/lang/py/scripts/avro
    avro/trunk/lang/py/test/test_script.py
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/py/build.xml
    avro/trunk/lang/py/setup.py

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1137526&r1=1137525&r2=1137526&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Jun 20 07:02:28 2011
@@ -8,6 +8,9 @@ Avro 1.6.0 (unreleased)
 
   IMPROVEMENTS
 
+    AVRO-836. Python "avro" commandline utility to display and write Avro files.
+    (Miki Tebeka via philz)
+
     AVRO-833. Don't require simplejson for python >= 2.6.
     (Miki Tebeka via philz)
 

Modified: avro/trunk/lang/py/build.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/build.xml?rev=1137526&r1=1137525&r2=1137526&view=diff
==============================================================================
--- avro/trunk/lang/py/build.xml (original)
+++ avro/trunk/lang/py/build.xml Mon Jun 20 07:02:28 2011
@@ -98,6 +98,20 @@
       </filterset>
     </copy>
 
+     <!-- Inline the Avro version -->
+     <copy file="${basedir}/scripts/avro"
+           toFile="${build.dir}/scripts/avro"
+           overwrite="true">
+       <filterset>
+         <filter token="AVRO_VERSION" value="${avro.version}"/>
+       </filterset>
+     </copy>
+     <!-- Make executable (Ant does not preseve executable bit) -->
+     <exec executable="chmod">
+         <arg value="a+x" />
+         <arg value="${build.dir}/scripts/avro" />
+     </exec>
+
     <!-- Inline the interop data directory -->
     <copy file="${test.dir}/test_datafile_interop.py"
           toFile="${build.dir}/test/test_datafile_interop.py"

Added: avro/trunk/lang/py/scripts/avro
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/scripts/avro?rev=1137526&view=auto
==============================================================================
--- avro/trunk/lang/py/scripts/avro (added)
+++ avro/trunk/lang/py/scripts/avro Mon Jun 20 07:02:28 2011
@@ -0,0 +1,243 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Command line utlity for reading and writing Avro files."""
+
+from avro.io import DatumReader, DatumWriter
+from avro.datafile import DataFileReader, DataFileWriter
+import avro.schema
+
+try:
+    import json
+except ImportError:
+    import simplejson as json
+import csv
+from sys import stdout, stdin
+from itertools import ifilter, imap
+from functools import partial
+from os.path import splitext
+
+class AvroError(Exception):
+    pass
+
+def print_json(row):
+    print(json.dumps(row))
+
+def print_json_pretty(row):
+    print(json.dumps(row, indent=4))
+
+_write_row = csv.writer(stdout).writerow
+_encoding = stdout.encoding or "UTF-8"
+def _encode(v, encoding=_encoding):
+    if not isinstance(v, basestring):
+        return v
+    return v.encode(_encoding)
+
+def print_csv(row):
+    # We sort the keys to the fields will be in the same place
+    # FIXME: Do we want to do it in schema order?
+    _write_row([_encode(row[key]) for key in sorted(row)])
+
+def select_printer(format):
+    return {
+        "json" : print_json,
+        "json-pretty" : print_json_pretty,
+        "csv" : print_csv
+    }[format]
+
+def record_match(expr, record):
+    return eval(expr, None, {"r" : record})
+
+def print_avro(avro, opts):
+    if opts.header and (opts.format != "csv"):
+        raise AvroError("--header applies only to CSV format")
+
+    # Apply filter first
+    if opts.filter:
+        avro = ifilter(partial(record_match, opts.filter), avro)
+
+    for i in xrange(opts.skip):
+        try:
+            next(avro)
+        except StopIteration:
+            return
+
+    printer = select_printer(opts.format)
+    for i, record in enumerate(avro):
+        if i == 0 and opts.header:
+            _write_row(sorted(record.keys()))
+        if i >= opts.count:
+            break
+        printer(record)
+
+def print_schema(avro):
+    schema = avro.meta["avro.schema"]
+    # Pretty print
+    print json.dumps(json.loads(schema), indent=4)
+
+def cat(opts, args):
+    if not args:
+        raise AvroError("No files to show")
+
+    for filename in args:
+        try:
+            fo = open(filename, "rb")
+        except (OSError, IOError), e:
+            raise AvroError("Can't open %s - %s" % (filename, e))
+
+        avro = DataFileReader(fo, DatumReader())
+
+        if opts.print_schema:
+            print_schema(avro)
+            continue
+
+        print_avro(avro, opts)
+
+def _open(filename, mode):
+    if filename == "-":
+        return {
+            "rb" : stdin,
+            "wb" : stdout
+        }[mode]
+
+    return open(filename, mode)
+
+def iter_json(info, _):
+    return imap(json.loads, info)
+
+def convert(value, field):
+    type = field.type.type
+    if type == "union":
+        return convert_union(value, field)
+
+    return  {
+        "int" : int,
+        "long" : long,
+        "float" : float,
+        "double" : float,
+        "string" : str,
+        "bytes" : str,
+        "boolean" : bool,
+        "null" : lambda _: None,
+        "union" : lambda v: convert_union(v, field),
+    }[type](value)
+
+def convert_union(value, field):
+    for name in [s.name for s in field.type.schemas]:
+        try:
+            return convert(name)(value)
+        except ValueError:
+            continue
+
+def iter_csv(info, schema):
+    header = [field.name for field in schema.fields]
+    for row in csv.reader(info):
+        values = [convert(v, f) for v, f in zip(row, schema.fields)]
+        yield dict(zip(header, values))
+
+def guess_input_type(files):
+    if not files:
+        return None
+
+    ext = splitext(files[0])[1].lower()
+    if ext in (".json", ".js"):
+        return "json"
+    elif ext in (".csv",):
+        return "csv"
+
+    return None
+
+def write(opts, files):
+    if not opts.schema:
+        raise AvroError("No schema specified")
+
+    input_type = opts.input_type or guess_input_type(files)
+    if not input_type:
+        raise AvroError("Can't guess input file type (not .json or .csv)")
+
+    try:
+        schema = avro.schema.parse(open(opts.schema, "rb").read())
+        out = _open(opts.output, "wb")
+    except (IOError, OSError), e:
+        raise AvroError("Can't open file - %s" % e)
+
+    writer = DataFileWriter(out, DatumWriter(), schema)
+
+    iter_records = {"json" : iter_json, "csv" : iter_csv}[input_type]
+    for filename in (files or ["-"]):
+        info = _open(filename, "rb")
+        for record in iter_records(info, schema):
+            writer.append(record)
+
+    writer.close()
+
+def main(argv=None):
+    import sys
+    from optparse import OptionParser, OptionGroup
+
+    argv = argv or sys.argv
+
+    parser = OptionParser(description="Display/write for Avro files",
+                      version="@AVRO_VERSION@",
+                      usage="usage: %prog cat|write [options] FILE [FILE...]")
+    # cat options
+
+    cat_options = OptionGroup(parser, "cat options")
+    cat_options.add_option("-n", "--count", default=float("Infinity"),
+                    help="number of records to print", type=int)
+    cat_options.add_option("-s", "--skip", help="number of records to skip",
+                           type=int, default=0)
+    cat_options.add_option("-f", "--format", help="record format",
+                           default="json",
+                           choices=["json", "csv", "json-pretty"])
+    cat_options.add_option("--header", help="print CSV header", default=False,
+                   action="store_true")
+    cat_options.add_option("--filter", help="filter records (e.g. r['age']>1)",
+                    default=None)
+    cat_options.add_option("--print-schema", help="print schema",
+                      action="store_true", default=False)
+    parser.add_option_group(cat_options)
+
+    # write options
+    write_options = OptionGroup(parser, "write options")
+    write_options.add_option("--schema", help="schema file (required)")
+    write_options.add_option("--input-type",
+                             help="input file(s) type (json or csv)",
+                             choices=["json", "csv"], default=None)
+    write_options.add_option("-o", "--output", help="output file", default="-")
+    parser.add_option_group(write_options)
+
+    opts, args = parser.parse_args(argv[1:])
+    if len(args) < 1:
+        parser.error("You much specify `cat` or `write`")  # Will exit
+
+    command = args.pop(0)
+    try:
+        if command == "cat":
+            cat(opts, args)
+        elif command == "write":
+            write(opts, args)
+        else:
+            raise AvroError("Unknown command - %s" % command)
+    except AvroError, e:
+        parser.error("%s" % e) # Will exit
+    except Exception, e:
+        raise SystemExit("panic: %s" % e)
+
+if __name__ == "__main__":
+    main()
+

Modified: avro/trunk/lang/py/setup.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/setup.py?rev=1137526&r1=1137525&r2=1137526&view=diff
==============================================================================
--- avro/trunk/lang/py/setup.py (original)
+++ avro/trunk/lang/py/setup.py Mon Jun 20 07:02:28 2011
@@ -31,6 +31,7 @@ setup(
   version = '@AVRO_VERSION@',
   packages = ['avro',],
   package_dir = {'avro': 'src/avro'},
+  scripts = ["./scripts/avro"],
 
   # Project uses simplejson, so ensure that it gets installed or upgraded
   # on the target machine

Added: avro/trunk/lang/py/test/test_script.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_script.py?rev=1137526&view=auto
==============================================================================
--- avro/trunk/lang/py/test/test_script.py (added)
+++ avro/trunk/lang/py/test/test_script.py Mon Jun 20 07:02:28 2011
@@ -0,0 +1,238 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+import csv
+from cStringIO import StringIO
+try:
+    import json
+except ImportError:
+    import simplejson as json
+from tempfile import NamedTemporaryFile
+import avro.schema
+from avro.io import DatumWriter
+from avro.datafile import DataFileWriter
+from os.path import dirname, join, isfile
+from os import remove
+from operator import itemgetter
+
+NUM_RECORDS = 7
+
+try:
+    from subprocess import check_output
+except ImportError:
+    from subprocess import Popen, PIPE
+
+    def check_output(args):
+        pipe = Popen(args, stdout=PIPE)
+        if pipe.wait() != 0:
+            raise ValueError
+        return pipe.stdout.read()
+
+try:
+    from subprocess import check_call
+except ImportError:
+    def check_call(args, **kw):
+        pipe = Popen(args, **kw)
+        assert pipe.wait() == 0
+
+SCHEMA = '''
+{
+    "namespace": "test.avro",
+        "name": "LooneyTunes",
+        "type": "record",
+        "fields": [
+            {"name": "first", "type": "string"},
+            {"name": "last", "type": "string"},
+            {"name": "type", "type": "string"}
+        ]
+}
+'''
+
+LOONIES = (
+    ("daffy", "duck", "duck"),
+    ("bugs", "bunny", "bunny"),
+    ("tweety", "", "bird"),
+    ("road", "runner", "bird"),
+    ("wile", "e", "coyote"),
+    ("pepe", "le pew", "skunk"),
+    ("foghorn", "leghorn", "rooster"),
+)
+
+def looney_records():
+    for f, l, t in LOONIES:
+        yield {"first": f, "last" : l, "type" : t}
+
+SCRIPT = join(dirname(__file__), "..", "scripts", "avro")
+
+_JSON_PRETTY = '''{
+    "type": "duck", 
+    "last": "duck", 
+    "first": "daffy"
+}'''
+
+def gen_avro(filename):
+    schema = avro.schema.parse(SCHEMA)
+    fo = open(filename, "wb")
+    writer = DataFileWriter(fo, DatumWriter(), schema)
+    for record in looney_records():
+        writer.append(record)
+    writer.close()
+    fo.close()
+
+def tempfile():
+    return NamedTemporaryFile(delete=False).name
+
+class TestCat(unittest.TestCase):
+    def setUp(self):
+        self.avro_file = tempfile()
+        gen_avro(self.avro_file)
+
+    def tearDown(self):
+        if isfile(self.avro_file):
+            remove(self.avro_file)
+
+    def _run(self, *args, **kw):
+        out = check_output([SCRIPT, "cat", self.avro_file] + list(args))
+        if kw.get("raw"):
+            return out
+        else:
+            return out.splitlines()
+
+    def test_print(self):
+        return len(self._run()) == NUM_RECORDS
+
+    def test_filter(self):
+        return len(self._run("--filter", "r['type']=='bird'")) == 2
+
+    def test_skip(self):
+        skip = 3
+        return len(self._run("--skip", str(skip))) == NUM_RECORDS - skip
+
+    def test_csv(self):
+        reader = csv.reader(StringIO(self._run("-f", "csv", raw=True)))
+        assert len(list(reader)) == NUM_RECORDS
+
+    def test_csv_header(self):
+        io = StringIO(self._run("-f", "csv", "--header", raw=True))
+        reader = csv.DictReader(io)
+        r = {"type": "duck", "last": "duck", "first": "daffy"}
+        assert next(reader) == r
+
+    def test_print_schema(self):
+        out = self._run("--print-schema", raw=True)
+        assert json.loads(out)["namespace"] == "test.avro"
+
+    def test_help(self):
+        # Just see we have these
+        self._run("-h")
+        self._run("--help")
+
+    def test_json_pretty(self):
+        out = self._run("--format", "json-pretty", "-n", "1", raw=1)
+        assert out.strip() == _JSON_PRETTY.strip()
+
+    def test_version(self):
+        check_output([SCRIPT, "cat", "--version"])
+
+    def test_files(self):
+        out = self._run(self.avro_file)
+        assert len(out) == 2 * NUM_RECORDS
+
+class TestWrite(unittest.TestCase):
+    def setUp(self):
+        self.json_file = tempfile() + ".json"
+        fo = open(self.json_file, "w")
+        for record in looney_records():
+            json.dump(record, fo)
+            fo.write("\n")
+        fo.close()
+
+        self.csv_file = tempfile() + ".csv"
+        fo = open(self.csv_file, "w")
+        write = csv.writer(fo).writerow
+        get = itemgetter("first", "last", "type")
+        for record in looney_records():
+            write(get(record))
+        fo.close()
+
+        self.schema_file = tempfile()
+        fo = open(self.schema_file, "w")
+        fo.write(SCHEMA)
+        fo.close()
+
+    def tearDown(self):
+        for filename in (self.csv_file, self.json_file, self.schema_file):
+            try:
+                remove(filename)
+            except OSError:
+                continue
+
+    def _run(self, *args, **kw):
+        args = [SCRIPT, "write", "--schema", self.schema_file] + list(args)
+        check_call(args, **kw)
+
+    def load_avro(self, filename):
+        out = check_output([SCRIPT, "cat", filename])
+        return map(json.loads, out.splitlines())
+
+    def test_version(self):
+        check_call([SCRIPT, "write", "--version"])
+
+    def format_check(self, format, filename):
+        tmp = tempfile()
+        fo = open(tmp, "wb")
+        self._run(filename, "-f", format, stdout=fo)
+        fo.close()
+
+        records = self.load_avro(tmp)
+        assert len(records) == NUM_RECORDS
+        assert records[0]["first"] == "daffy"
+
+        remove(tmp)
+
+    def test_write_json(self):
+        self.format_check("json", self.json_file)
+
+    def test_write_csv(self):
+        self.format_check("csv", self.csv_file)
+
+    def test_outfile(self):
+        tmp = tempfile()
+        remove(tmp)
+        self._run(self.json_file, "-o", tmp)
+
+        assert len(self.load_avro(tmp)) == NUM_RECORDS
+        remove(tmp)
+
+    def test_multi_file(self):
+        tmp = tempfile()
+        fo = open(tmp, "wb")
+        self._run(self.json_file, self.json_file, stdout=fo)
+        fo.close()
+
+        assert len(self.load_avro(tmp)) == 2 * NUM_RECORDS
+        remove(tmp)
+
+    def test_stdin(self):
+        tmp = tempfile()
+
+        info = open(self.json_file, "rb")
+        fo = open(tmp, "wb")
+        self._run("--input-type", "json", stdin=info, stdout=fo)
+        fo.close()
+
+        assert len(self.load_avro(tmp)) == NUM_RECORDS
+        remove(tmp)