You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2020/05/22 18:05:04 UTC

[avro] branch master updated: AVRO-1720: Add tool to count records in avro files

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 4318701  AVRO-1720: Add tool to count records in avro files
4318701 is described below

commit 4318701a20e8b0131374e63ccd263338ea57891c
Author: priesnit <vi...@researchgate.net>
AuthorDate: Mon May 11 22:02:37 2020 +0200

    AVRO-1720: Add tool to count records in avro files
---
 .../src/main/java/org/apache/avro/tool/Main.java   | 15 ++--
 .../java/org/apache/avro/tool/RecordCountTool.java | 89 +++++++++++++++++++
 .../org/apache/avro/tool/TestRecordCountTool.java  | 99 ++++++++++++++++++++++
 3 files changed, 196 insertions(+), 7 deletions(-)

diff --git a/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java b/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java
index 96ad87b..59ae796 100644
--- a/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java
+++ b/lang/java/tools/src/main/java/org/apache/avro/tool/Main.java
@@ -36,13 +36,14 @@ public class Main {
 
   Main() {
     tools = new TreeMap<>();
-    for (Tool tool : new Tool[] { new CatTool(), new SpecificCompilerTool(), new InduceSchemaTool(),
-        new JsonToBinaryFragmentTool(), new BinaryFragmentToJsonTool(), new CreateRandomFileTool(),
-        new DataFileReadTool(), new DataFileWriteTool(), new DataFileGetMetaTool(), new DataFileGetSchemaTool(),
-        new DataFileRepairTool(), new IdlTool(), new IdlToSchemataTool(), new RecodecTool(), new ConcatTool(),
-        new RpcReceiveTool(), new RpcSendTool(), new RpcProtocolTool(), new FromTextTool(), new ToTextTool(),
-        new ToTrevniTool(), new TetherTool(), new TrevniCreateRandomTool(), new TrevniMetadataTool(),
-        new TrevniToJsonTool(), new SchemaNormalizationTool(), new SchemaFingerprintTool() }) {
+    for (Tool tool : new Tool[] { new CatTool(), new RecordCountTool(), new SpecificCompilerTool(),
+        new InduceSchemaTool(), new JsonToBinaryFragmentTool(), new BinaryFragmentToJsonTool(),
+        new CreateRandomFileTool(), new DataFileReadTool(), new DataFileWriteTool(), new DataFileGetMetaTool(),
+        new DataFileGetSchemaTool(), new DataFileRepairTool(), new IdlTool(), new IdlToSchemataTool(),
+        new RecodecTool(), new ConcatTool(), new RpcReceiveTool(), new RpcSendTool(), new RpcProtocolTool(),
+        new FromTextTool(), new ToTextTool(), new ToTrevniTool(), new TetherTool(), new TrevniCreateRandomTool(),
+        new TrevniMetadataTool(), new TrevniToJsonTool(), new SchemaNormalizationTool(),
+        new SchemaFingerprintTool() }) {
       Tool prev = tools.put(tool.getName(), tool);
       if (prev != null) {
         throw new AssertionError("Two tools with identical names: " + tool + ", " + prev);
diff --git a/lang/java/tools/src/main/java/org/apache/avro/tool/RecordCountTool.java b/lang/java/tools/src/main/java/org/apache/avro/tool/RecordCountTool.java
new file mode 100644
index 0000000..2b6f516
--- /dev/null
+++ b/lang/java/tools/src/main/java/org/apache/avro/tool/RecordCountTool.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import com.google.common.collect.ImmutableList;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.fs.Path;
+
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+/** Counts the records in avro files or folders */
+public class RecordCountTool implements Tool {
+
+  @Override
+  public String getName() {
+    return "count";
+  }
+
+  @Override
+  public String getShortDescription() {
+    return "Counts the records in avro files or folders";
+  }
+
+  @Override
+  public int run(InputStream stdin, PrintStream out, PrintStream err, List<String> args) throws Exception {
+    OptionParser optionParser = new OptionParser();
+    OptionSet optionSet = optionParser.parse(args.toArray(new String[0]));
+    List<String> nargs = (List<String>) optionSet.nonOptionArguments();
+
+    if (nargs.isEmpty()) {
+      printHelp(err);
+      err.println();
+      optionParser.printHelpOn(err);
+      return 0;
+    }
+
+    long count = 0L;
+    if (ImmutableList.of("-").equals(nargs)) {
+      count = countRecords(stdin);
+    } else {
+      for (Path file : Util.getFiles(nargs)) {
+        try (final InputStream inStream = Util.openFromFS(file)) {
+          count += countRecords(inStream);
+        }
+      }
+    }
+    out.println(count);
+    out.flush();
+    return 0;
+  }
+
+  private long countRecords(InputStream inStream) throws java.io.IOException {
+    long count = 0L;
+    try (DataFileStream<Object> streamReader = new DataFileStream<>(inStream, new GenericDatumReader<>())) {
+      while (streamReader.hasNext()) {
+        count = count + streamReader.getBlockCount();
+        streamReader.nextBlock();
+      }
+    }
+    return count;
+  }
+
+  private void printHelp(PrintStream ps) {
+    ps.println(getName() + " [input-files...]");
+    ps.println();
+    ps.println(getShortDescription());
+    ps.println("A dash ('-') can be given as an input-file to use stdin");
+  }
+}
diff --git a/lang/java/tools/src/test/java/org/apache/avro/tool/TestRecordCountTool.java b/lang/java/tools/src/test/java/org/apache/avro/tool/TestRecordCountTool.java
new file mode 100644
index 0000000..af3b887
--- /dev/null
+++ b/lang/java/tools/src/test/java/org/apache/avro/tool/TestRecordCountTool.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestRecordCountTool {
+
+  @Rule
+  public TestName name = new TestName();
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private File generateData(int numRecords) throws Exception {
+    final File tempFile = temporaryFolder.newFile();
+
+    Schema schema = Schema.create(Type.STRING);
+    try (DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
+      writer.create(schema, tempFile);
+
+      // ~10 records per block
+      writer.setSyncInterval(60);
+      for (int i = 0; i < numRecords; i++) {
+        writer.append("foobar");
+      }
+    }
+    return tempFile;
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testFileDoesNotExist() throws Exception {
+    List<String> args = Collections
+        .singletonList(new File(temporaryFolder.getRoot(), "nonExistingFile").getAbsolutePath());
+    int returnCode = new RecordCountTool().run(System.in, System.out, System.err, args);
+    assertEquals(1, returnCode);
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    final List<Integer> inputSizes = IntStream.range(0, 20).boxed().collect(Collectors.toList());
+    for (Integer inputSize : inputSizes) {
+      File inputFile = generateData(inputSize);
+      List<String> args = Collections.singletonList(inputFile.getAbsolutePath());
+      final ByteArrayOutputStream out = new ByteArrayOutputStream();
+      int returnCode = new RecordCountTool().run(System.in, new PrintStream(out), System.err, args);
+
+      assertEquals(0, returnCode);
+      assertEquals(inputSize.toString() + System.lineSeparator(), out.toString());
+    }
+  }
+
+  @Test
+  public void testMultipleFiles() throws Exception {
+    File f1 = generateData(20);
+    File f2 = generateData(200);
+
+    List<String> args = Arrays.asList(f1.getAbsolutePath(), f2.getAbsolutePath());
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    int returnCode = new RecordCountTool().run(System.in, new PrintStream(out), System.err, args);
+
+    assertEquals(0, returnCode);
+    assertEquals("220" + System.lineSeparator(), out.toString());
+  }
+
+}