You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/04/22 21:59:42 UTC

svn commit: r1470691 - in /avro/trunk: ./ lang/java/tools/src/main/java/org/apache/avro/tool/

Author: cutting
Date: Mon Apr 22 19:59:42 2013
New Revision: 1470691

URL: http://svn.apache.org/r1470691
Log:
AVRO-867. Java: Enable command-line tools to read data files from any Hadoop FileSystem implementation.  Contributed by Vincenz Priesnitz.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetMetaTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetSchemaTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileWriteTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RecodecTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Util.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Apr 22 19:59:42 2013
@@ -28,6 +28,9 @@ Trunk (not yet released)
     AVRO-1288. Ruby: Add support for deflate codec in data files.
     (martinkl)
 
+    AVRO-867. Java: Enable command-line tools to read data files from
+    any Hadoop FileSystem implementation. (Vincenz Priesnitz via cutting)
+
   BUG FIXES
 
     AVRO-1296. Python: Fix schemas retrieved from protocol types

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java Mon Apr 22 19:59:42 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.avro.tool;
 
-import java.io.FileInputStream;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.util.List;
@@ -43,16 +42,9 @@ public class BinaryFragmentToJsonTool im
       err.println("Use '-' as binary_data_file for stdin.");
       return 1;
     }
-    Schema schema = Schema.parse(args.get(0));
-    InputStream input;
-    boolean needsClosing;
-    if (args.get(1).equals("-")) {
-      input = stdin;
-      needsClosing = false;
-    } else {
-      input = new FileInputStream(args.get(1));
-      needsClosing = true;
-    }
+    Schema schema = new Schema.Parser().parse(args.get(0));
+    InputStream input = Util.fileOrStdin(args.get(1), stdin);
+
     try {
       DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
       Object datum = reader.read(null,
@@ -66,9 +58,7 @@ public class BinaryFragmentToJsonTool im
       out.println();
       out.flush();
     } finally {
-      if (needsClosing) {
-        input.close();
-      }
+      Util.close(input);
     }
     return 0;
   }

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetMetaTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetMetaTool.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetMetaTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetMetaTool.java Mon Apr 22 19:59:42 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.avro.tool;
 
-import java.io.File;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.util.List;
@@ -28,6 +27,7 @@ import joptsimple.OptionSpec;
 
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.FsInput;
 
 /** Reads a data file to get its metadata. */
 public class DataFileGetMetaTool implements Tool {
@@ -60,9 +60,9 @@ public class DataFileGetMetaTool impleme
       p.printHelpOn(err);
       return 1;
     }
+    FsInput in = Util.openSeekableFromFS(args.get(0));
     DataFileReader<Void> reader =
-      new DataFileReader<Void>(new File(args.get(0)),
-                               new GenericDatumReader<Void>());
+      new DataFileReader<Void>(in, new GenericDatumReader<Void>());
     if (keyName != null) {
       byte[] value = reader.getMeta(keyName);
       if (value != null) {

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetSchemaTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetSchemaTool.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetSchemaTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileGetSchemaTool.java Mon Apr 22 19:59:42 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.avro.tool;
 
-import java.io.File;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.util.List;
@@ -46,7 +45,7 @@ public class DataFileGetSchemaTool imple
       return 1;
     }
     DataFileReader<Void> reader =
-      new DataFileReader<Void>(new File(args.get(0)),
+      new DataFileReader<Void>(Util.openSeekableFromFS(args.get(0)),
                                new GenericDatumReader<Void>());
     out.println(reader.getSchema().toString(true));
     return 0;

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java Mon Apr 22 19:59:42 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.avro.tool;
 
-import java.io.File;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.util.List;
@@ -56,7 +55,7 @@ public class DataFileReadTool implements
 
     GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
     FileReader<Object> fileReader =
-      DataFileReader.openReader(new File(args.get(0)), reader);
+      DataFileReader.openReader(Util.openSeekableFromFS(args.get(0)), reader);
     try {
       Schema schema = fileReader.getSchema();
       DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileWriteTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileWriteTool.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileWriteTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileWriteTool.java Mon Apr 22 19:59:42 2013
@@ -19,8 +19,6 @@ package org.apache.avro.tool;
 
 import java.io.DataInputStream;
 import java.io.EOFException;
-import java.io.FileReader;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.util.List;
@@ -84,11 +82,10 @@ public class DataFileWriteTool implement
         p.printHelpOn(err);
         return 1;
     }
-    if (schemafile != null) {
-        schemastr = readSchemaFromFile(schemafile);
-    }
+    Schema schema = (schemafile != null)
+        ? new Schema.Parser().parse(Util.openFromFS(schemafile))
+        : new Schema.Parser().parse(schemastr);
     
-    Schema schema = Schema.parse(schemastr);
     DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
 
     InputStream input = Util.fileOrStdin(nargs.get(0), stdin);
@@ -110,28 +107,9 @@ public class DataFileWriteTool implement
       }
       writer.close();
     } finally {
-      if (input != stdin) {
-        input.close();
-      }
+      Util.close(input);
     }
     return 0;
   }
 
-  public static String readSchemaFromFile(String schemafile) throws IOException {
-    String schemastr;
-    StringBuilder b = new StringBuilder();
-    FileReader r = new FileReader(schemafile);
-    try {
-        char[] buf = new char[64*1024];
-        for(;;) {
-            int read = r.read(buf);
-            if (read==-1) break;
-            b.append(buf, 0, read);
-        }
-        schemastr = b.toString();
-    } finally {
-        r.close();
-    }
-    return schemastr;
-  }
 }

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java Mon Apr 22 19:59:42 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.avro.tool;
 
-import java.io.FileInputStream;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.util.List;
@@ -39,17 +38,8 @@ public class JsonToBinaryFragmentTool im
       err.println("Use '-' as json_data_file for stdin.");
       return 1;
     }
-    Schema schema = Schema.parse(args.get(0));
-    InputStream input;
-    boolean needsClosing;
-    if (args.get(1).equals("-")) {
-      input = stdin;
-      needsClosing = false;
-    } else {
-      input = new FileInputStream(args.get(1));
-      needsClosing = true;
-    }
-    
+    Schema schema = new Schema.Parser().parse(args.get(0));
+    InputStream input = Util.fileOrStdin(args.get(1), stdin);
     try {
     GenericDatumReader<Object> reader = 
         new GenericDatumReader<Object>(schema);
@@ -62,9 +52,7 @@ public class JsonToBinaryFragmentTool im
     writer.write(datum, e);
     e.flush();
     } finally {
-      if (needsClosing) {
-        input.close();
-      }
+      Util.close(input);
     }
     return 0;
   }

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RecodecTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RecodecTool.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RecodecTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RecodecTool.java Mon Apr 22 19:59:42 2013
@@ -17,8 +17,6 @@
  */
 package org.apache.avro.tool;
 
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -65,13 +63,13 @@ public class RecodecTool implements Tool
     InputStream input = in;
     boolean inputNeedsClosing = false;
     if (nargs.size() > 0 && !nargs.get(0).equals("-")) {
-      input = new FileInputStream(nargs.get(0));
+      input = Util.openFromFS(nargs.get(0));
       inputNeedsClosing = true;
     }
     OutputStream output = out;
     boolean outputNeedsClosing = false;
     if (nargs.size() > 1 && !nargs.get(1).equals("-")) {
-      output = new FileOutputStream(nargs.get(1));
+      output = Util.createFromFS(nargs.get(1));
       outputNeedsClosing = true;
     }
 

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java Mon Apr 22 19:59:42 2013
@@ -55,8 +55,7 @@ public class ToTextTool implements Tool 
     OptionParser p = new OptionParser();
     OptionSet opts = p.parse(args.toArray(new String[0]));
     if (opts.nonOptionArguments().size() != 2) {
-      err.println("Expected 2 args: from_file to_file (local filenames," +
-      " Hadoop URI's, or '-' for stdin/stdout");
+      err.println("Expected 2 args: from_file to_file (filenames or '-' for stdin/stdout");
       p.printHelpOn(err);
       return 1;
     }
@@ -66,11 +65,12 @@ public class ToTextTool implements Tool 
 
     GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
     DataFileStream<Object> fileReader =
-      new DataFileStream<Object>(inStream, reader);
+        new DataFileStream<Object>(inStream, reader);
 
-    if (!fileReader.getSchema().equals(Schema.parse(TEXT_FILE_SCHEMA))) {
+    if (!fileReader.getSchema().equals(new Schema.Parser().parse(TEXT_FILE_SCHEMA))) {
       err.println("Avro file is not generic text schema");
       p.printHelpOn(err);
+      fileReader.close();
       return 1;
     }
     
@@ -79,9 +79,9 @@ public class ToTextTool implements Tool 
       outStream.write(outBuff.array());
       outStream.write(LINE_SEPARATOR);
     }
-    
-    outStream.close();
-    inStream.close();
+    fileReader.close();
+    Util.close(inStream);
+    Util.close(outStream);
     return 0;
   }
 

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Util.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Util.java?rev=1470691&r1=1470690&r2=1470691&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Util.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Util.java Mon Apr 22 19:59:42 2013
@@ -20,64 +20,110 @@ package org.apache.avro.tool;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.URI;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.mapred.FsInput;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /** Static utility methods for tools. */
 class Util {
   /**
-   * Returns stdin if filename is "-", else opens the local or HDFS file
+   * Returns stdin if filename is "-", else opens the File in the owning filesystem
    * and returns an InputStream for it.
+   * Relative paths will be opened in the default filesystem. 
+   * @param filename The filename to be opened
    * @throws IOException 
    */
   static BufferedInputStream fileOrStdin(String filename, InputStream stdin) 
       throws IOException {
-    if (filename.equals("-")) {
-      return new BufferedInputStream(stdin);
-    } 
-    else {
-      String[] parts = filename.split(":");
-      if (parts.length > 1 && parts[0].equals("hdfs")) {
-        FileSystem fs = FileSystem.get(
-            URI.create(filename), new Configuration());
-        return new BufferedInputStream(fs.open(new Path(filename)));
-      } else {
-        return new BufferedInputStream(new FileInputStream(new File(filename)));
-      }
-    }
+    return new BufferedInputStream(filename.equals("-")
+        ? stdin
+        : openFromFS(filename));  
   }
   
   /**
-   * Returns stdout if filename is "-", else opens the local or HDFS file
+   * Returns stdout if filename is "-", else opens the file from the owning filesystem
    * and returns an OutputStream for it.
+   * Relative paths will be opened in the default filesystem.  
+   * @param filename The filename to be opened
    * @throws IOException 
    */
   static BufferedOutputStream fileOrStdout(String filename, OutputStream stdout) 
-  throws IOException {
-    if (filename.equals("-")) {
-      return new BufferedOutputStream(stdout);
-    } 
-    else {
-      String[] parts = filename.split(":");
-      if (parts.length > 1 && parts[0].equals("hdfs")) {
-        FileSystem fs = FileSystem.get(
-          URI.create(filename), new Configuration());
-        return new BufferedOutputStream(fs.create(new Path(filename)));
-      } else {
-        return new BufferedOutputStream(
-            new FileOutputStream(new File(filename)));
+      throws IOException {
+    return new BufferedOutputStream(filename.equals("-")
+        ? stdout
+        : createFromFS(filename));
+  }
+  
+  /**
+   * Returns an InputStream for the file using the owning filesystem,
+   * or the default if none is given.
+   * @param filename The filename to be opened
+   * @throws IOException 
+   */
+  static InputStream openFromFS(String filename) 
+      throws IOException {
+    Path p = new Path(filename);
+    return p.getFileSystem(new Configuration()).open(p);
+  }
+  
+  /**
+   * Returns a seekable FsInput using the owning filesystem, 
+   * or the default if none is given.
+   * @param filename The filename to be opened
+   * @throws IOException 
+   */
+  static FsInput openSeekableFromFS(String filename) 
+      throws IOException {       
+    return new FsInput(new Path(filename), new Configuration());
+  }
+  
+  /**
+   * Opens the file for writing in the owning filesystem,
+   * or the default if none is given.
+   * @param filename The filename to be opened.
+   * @return An OutputStream to the specified file.
+   * @throws IOException
+   */
+  static OutputStream createFromFS(String filename) 
+      throws IOException {
+    Path p = new Path(filename);
+    return new BufferedOutputStream(p.getFileSystem(new Configuration()).create(p));
+  }
+  
+  /**
+   * Closes the inputstream created from {@link Util.fileOrStdin} 
+   * unless it is System.in.
+   * @param in The inputstream to be closed.
+   */
+  static void close(InputStream in) {
+    if (!System.in.equals(in)) {
+      try {
+        in.close();
+      } catch (IOException e) {
+        System.err.println("could not close InputStream " + in.toString());
+      }
+    }
+  }
+  
+  /**
+   * Closes the outputstream created from {@link Util.fileOrStdout}
+   * unless it is System.out.
+   * @param out The outputStream to be closed.
+   */
+  static void close(OutputStream out) {
+    if (!System.out.equals(out)) {
+      try {
+        out.close();
+      } catch (IOException e) {
+        System.err.println("could not close OutputStream " + out.toString());
       }
     }
   }