You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/12 23:27:25 UTC

svn commit: r1445416 - in /avro/trunk: CHANGES.txt lang/java/tools/src/main/java/org/apache/avro/tool/ConcatTool.java lang/java/tools/src/main/java/org/apache/avro/tool/Main.java lang/java/tools/src/test/java/org/apache/avro/tool/TestConcatTool.java

Author: cutting
Date: Tue Feb 12 22:27:25 2013
New Revision: 1445416

URL: http://svn.apache.org/r1445416
Log:
AVRO-1250. Add a command-line tool to concatenate data files.  Contributed by Nick White.

Added:
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ConcatTool.java   (with props)
    avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestConcatTool.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1445416&r1=1445415&r2=1445416&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Feb 12 22:27:25 2013
@@ -9,6 +9,9 @@ Trunk (not yet released)
 
     AVRO-1229. Add support for booleans to Trevni. (cutting)
 
+    AVRO-1250. Add a command-line tool to concatenate data files.
+    (Nick White via cutting)
+
   IMPROVEMENTS
 
     AVRO-1211. Add MR guide to documentation. (Skye Wanderman-Milne via

Added: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ConcatTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ConcatTool.java?rev=1445416&view=auto
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ConcatTool.java (added)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ConcatTool.java Tue Feb 12 22:27:25 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Tool to concatenate avro files with the same schema and non-reserved
+ * metatdata.
+ */
+public class ConcatTool implements Tool {
+  /**
+   * @return 0 for success, 1 if the schemas of the input files differ, 2 if
+   *         the non-reserved input metadata differs, 3 if the input files are
+   *         encoded with more than one codec.
+   */
+  @Override
+  public int run(InputStream in, PrintStream out, PrintStream err,
+      List<String> args) throws Exception {
+
+    if(args.isEmpty()) {
+      printHelp(out);
+      return 0;
+    }
+
+    OutputStream output = out;
+    if (args.size() > 1) {
+      output = Util.fileOrStdout(args.get(args.size() - 1), out);
+      args = args.subList(0, args.size() - 1);
+    }
+
+    DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(
+      new GenericDatumWriter<GenericRecord>());
+    Schema schema = null;
+    Map<String, byte[]> metadata = new TreeMap<String, byte[]>();
+    String inputCodec = null;
+
+    for (String inFile : args) {
+      InputStream input = Util.fileOrStdin(inFile, in);
+      DataFileStream<GenericRecord> reader = new DataFileStream<GenericRecord>(
+        input, new GenericDatumReader<GenericRecord>());
+
+      if (schema == null) {
+        // this is the first file - set up the writer, and store the
+        // Schema & metadata we'll use.
+        schema = reader.getSchema();
+        for (String key : reader.getMetaKeys()) {
+          if (!DataFileWriter.isReservedMeta(key)) {
+            byte[] metadatum = reader.getMeta(key);
+            metadata.put(key, metadatum);
+            writer.setMeta(key, metadatum);
+          }
+        }
+        inputCodec = reader.getMetaString(DataFileConstants.CODEC);
+        if(inputCodec == null) {
+          inputCodec = DataFileConstants.NULL_CODEC;
+        }
+        writer.setCodec(CodecFactory.fromString(inputCodec));
+        writer.create(schema, output);
+      } else {
+        // check that we're appending to the same schema & metadata.
+        if (!schema.equals(reader.getSchema())) {
+          err.println("input files have different schemas");
+          reader.close();
+          return 1;
+        }
+        for (String key : reader.getMetaKeys()) {
+          if (!DataFileWriter.isReservedMeta(key)) {
+            byte[] metadatum = reader.getMeta(key);
+            byte[] writersMetadatum = metadata.get(key);
+            if(!Arrays.equals(metadatum, writersMetadatum)) {
+              err.println("input files have different non-reserved metadata");
+              reader.close();
+              return 2;
+            }
+          }
+        }
+        String thisCodec = reader.getMetaString(DataFileConstants.CODEC);
+        if(thisCodec == null) {
+          thisCodec = DataFileConstants.NULL_CODEC;
+        }
+        if (!inputCodec.equals(thisCodec)) {
+          err.println("input files have different codecs");
+          reader.close();
+          return 3;
+        }
+      }
+
+      writer.appendAllFrom(reader, /*recompress*/ false);
+      reader.close();
+    }
+
+    writer.close();
+    return 0;
+  }
+
+  private void printHelp(PrintStream out) {
+    out.println("concat [input-file...] output-file");
+    out.println();
+    out.println("Concatenates one or more input files into a new output file");
+    out.println("by appending the input blocks without decoding them. The input");
+    out.println("files must have the same schema, metadata and codec. If they");
+    out.println("do not the tool will return the following error codes:");
+    out.println("  1 if the schemas don't match");
+    out.println("  2 if the metadata doesn't match");
+    out.println("  3 if the codecs don't match");
+    out.println("If no input files are given stdin will be used. The tool");
+    out.println("0 on success. A dash ('-') can be given as an input file");
+    out.println("to use stdin, and as an output file to use stdout.");
+
+  }
+
+@Override
+  public String getName() {
+    return "concat";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "Concatenates avro files without re-compressing.";
+  }
+}

Propchange: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/ConcatTool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java?rev=1445416&r1=1445415&r2=1445416&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java Tue Feb 12 22:27:25 2013
@@ -45,6 +45,7 @@ public class Main {
         new DataFileGetSchemaTool(),
         new IdlTool(),
         new RecodecTool(),
+        new ConcatTool(),
         new RpcReceiveTool(),
         new RpcSendTool(),
         new RpcProtocolTool(),

Added: avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestConcatTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestConcatTool.java?rev=1445416&view=auto
==============================================================================
--- avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestConcatTool.java (added)
+++ avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestConcatTool.java Tue Feb 12 22:27:25 2013
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import static java.util.Arrays.asList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.avro.AvroTestUtil;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+public class TestConcatTool {
+  private static final int ROWS_IN_INPUT_FILES = 100000;
+  private static final CodecFactory DEFLATE = CodecFactory.deflateCodec(9);
+
+  private Object aDatum(Type ofType, int forRow) {
+    switch (ofType) {
+      case STRING:
+        return String.valueOf(forRow % 100);
+      case INT:
+        return forRow;
+      default:
+       throw new AssertionError("I can't generate data for this type");
+    }
+  }
+
+  private File generateData(String file, Type type, Map<String, String> metadata, CodecFactory codec) throws Exception {
+    File inputFile = AvroTestUtil.tempFile(getClass(), file);
+    inputFile.deleteOnExit();
+
+    Schema schema = Schema.create(type);
+    DataFileWriter<Object> writer = new DataFileWriter<Object>(
+              new GenericDatumWriter<Object>(schema));
+    for(Entry<String, String> metadatum : metadata.entrySet()) {
+        writer.setMeta(metadatum.getKey(), metadatum.getValue());
+    }
+    writer.setCodec(codec);
+    writer.create(schema, inputFile);
+
+    for (int i = 0; i < ROWS_IN_INPUT_FILES; i++) {
+      writer.append(aDatum(type, i));
+    }
+    writer.close();
+
+    return inputFile;
+  }
+
+  private CodecFactory getCodec(File output) throws Exception {
+      DataFileStream<GenericRecord> reader = new DataFileStream<GenericRecord>(
+        new FileInputStream(output),
+        new GenericDatumReader<GenericRecord>());
+      String codec = reader.getMetaString(DataFileConstants.CODEC);
+      try {
+        return codec == null ? CodecFactory.nullCodec() : CodecFactory.fromString(codec);
+      }finally{
+        reader.close();
+      }
+  }
+
+  private int numRowsInFile(File output) throws Exception {
+    DataFileStream<GenericRecord> reader = new DataFileStream<GenericRecord>(
+      new FileInputStream(output),
+      new GenericDatumReader<GenericRecord>());
+    Iterator<GenericRecord> rows = reader.iterator();
+    int rowcount = 0;
+    while(rows.hasNext()) {
+      ++rowcount;
+      rows.next();
+    }
+    reader.close();
+    return rowcount;
+  }
+
+  @Test
+  public void testConcat() throws Exception {
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put("myMetaKey", "myMetaValue");
+
+    File input1 = generateData("input1.avro", Type.STRING, metadata, DEFLATE);
+    File input2 = generateData("input2.avro", Type.STRING, metadata, DEFLATE);
+    File input3 = generateData("input3.avro", Type.STRING, metadata, DEFLATE);
+
+    File output = AvroTestUtil.tempFile(getClass(), "default-output.avro");
+    output.deleteOnExit();
+
+    List<String> args = asList(
+      input1.getAbsolutePath(),
+      input2.getAbsolutePath(),
+      input3.getAbsolutePath(),
+      output.getAbsolutePath());
+    int returnCode = new ConcatTool().run(
+      System.in,
+      System.out,
+      System.err,
+      args);
+    assertEquals(0, returnCode);
+
+    assertEquals(ROWS_IN_INPUT_FILES * 3, numRowsInFile(output));
+    assertEquals(getCodec(input1).getClass(), getCodec(output).getClass());
+  }
+
+  @Test
+  public void testDifferentSchemasFail() throws Exception {
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put("myMetaKey", "myMetaValue");
+
+    File input1 = generateData("input1.avro", Type.STRING, metadata, DEFLATE);
+    File input2 = generateData("input2.avro", Type.INT, metadata, DEFLATE);
+
+    File output = AvroTestUtil.tempFile(getClass(), "default-output.avro");
+    output.deleteOnExit();
+
+    List<String> args = asList(
+      input1.getAbsolutePath(),
+      input2.getAbsolutePath(),
+      output.getAbsolutePath());
+    int returnCode = new ConcatTool().run(
+      System.in,
+      System.out,
+      System.err,
+      args);
+    assertEquals(1, returnCode);
+  }
+
+  @Test
+  public void testDifferentMetadataFail() throws Exception {
+    Map<String, String> metadata1 = new HashMap<String, String>();
+    metadata1.put("myMetaKey", "myMetaValue");
+    Map<String, String> metadata2 = new HashMap<String, String>();
+    metadata2.put("myOtherMetaKey", "myOtherMetaValue");
+
+    File input1 = generateData("input1.avro", Type.STRING, metadata1, DEFLATE);
+    File input2 = generateData("input2.avro", Type.STRING, metadata2, DEFLATE);
+
+    File output = AvroTestUtil.tempFile(getClass(), "default-output.avro");
+    output.deleteOnExit();
+
+    List<String> args = asList(
+      input1.getAbsolutePath(),
+      input2.getAbsolutePath(),
+      output.getAbsolutePath());
+    int returnCode = new ConcatTool().run(
+      System.in,
+      System.out,
+      System.err,
+      args);
+    assertEquals(2, returnCode);
+  }
+
+  @Test
+  public void testDifferentCodecFail() throws Exception {
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put("myMetaKey", "myMetaValue");
+
+    File input1 = generateData("input1.avro", Type.STRING, metadata, DEFLATE);
+    File input2 = generateData("input2.avro", Type.STRING, metadata, CodecFactory.nullCodec());
+
+    File output = AvroTestUtil.tempFile(getClass(), "default-output.avro");
+    output.deleteOnExit();
+
+    List<String> args = asList(
+      input1.getAbsolutePath(),
+      input2.getAbsolutePath(),
+      output.getAbsolutePath());
+    int returnCode = new ConcatTool().run(
+      System.in,
+      System.out,
+      System.err,
+      args);
+    assertEquals(3, returnCode);
+  }
+
+  @Test
+  public void testHelpfulMessageWhenNoArgsGiven() throws Exception {
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream(1024);
+    PrintStream out = new PrintStream(buffer);
+    int returnCode = new ConcatTool().run(
+      System.in,
+      out,
+      System.err,
+      Collections.<String>emptyList());
+    out.close(); // flushes too
+
+    assertEquals(0, returnCode);
+    assertTrue(
+      "should have lots of help",
+      buffer.toString().trim().length() > 200);
+  }
+}

Propchange: avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestConcatTool.java
------------------------------------------------------------------------------
    svn:eol-style = native