You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2023/08/10 05:48:05 UTC
[parquet-mr] branch master updated: PARQUET-2334: Allow the cat subcommand to take multiple files (#1132)
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new d8aaf93b9 PARQUET-2334: Allow the cat subcommand to take multiple files (#1132)
d8aaf93b9 is described below
commit d8aaf93b960693a7dab52b0dc33c786f33938a54
Author: Kengo Seki <se...@apache.org>
AuthorDate: Thu Aug 10 14:48:00 2023 +0900
PARQUET-2334: Allow the cat subcommand to take multiple files (#1132)
---
.../apache/parquet/cli/commands/CatCommand.java | 55 ++++++++++++----------
.../parquet/cli/commands/CatCommandTest.java | 29 ++++++++++++
2 files changed, 59 insertions(+), 25 deletions(-)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
index 7703e88ca..aa0ab5ef7 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
@@ -29,7 +29,9 @@ import org.apache.parquet.cli.util.Expressions;
import org.slf4j.Logger;
import java.io.Closeable;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.parquet.cli.util.Expressions.select;
@@ -58,35 +60,38 @@ public class CatCommand extends BaseCommand {
Preconditions.checkArgument(
sourceFiles != null && !sourceFiles.isEmpty(),
"Missing file name");
- Preconditions.checkArgument(sourceFiles.size() == 1,
- "Only one file can be given");
- final String source = sourceFiles.get(0);
-
- Schema schema = getAvroSchema(source);
- Schema projection = Expressions.filterSchema(schema, columns);
+ // Ensure all source files have the columns specified first
+ Map<String, Schema> schemas = new HashMap<>();
+ for (String source : sourceFiles) {
+ Schema schema = getAvroSchema(source);
+ schemas.put(source, Expressions.filterSchema(schema, columns));
+ }
- Iterable<Object> reader = openDataFile(source, projection);
- boolean threw = true;
- long count = 0;
- try {
- for (Object record : reader) {
- if (numRecords > 0 && count >= numRecords) {
- break;
+ for (String source : sourceFiles) {
+ Schema projection = schemas.get(source);
+ Iterable<Object> reader = openDataFile(source, projection);
+ boolean threw = true;
+ long count = 0;
+ try {
+ for (Object record : reader) {
+ if (numRecords > 0 && count >= numRecords) {
+ break;
+ }
+ if (columns == null || columns.size() != 1) {
+ console.info(String.valueOf(record));
+ } else {
+ console.info(String.valueOf(select(projection, record, columns.get(0))));
+ }
+ count += 1;
}
- if (columns == null || columns.size() != 1) {
- console.info(String.valueOf(record));
- } else {
- console.info(String.valueOf(select(projection, record, columns.get(0))));
+ threw = false;
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count + " in file " + source, e);
+ } finally {
+ if (reader instanceof Closeable) {
+ Closeables.close((Closeable) reader, threw);
}
- count += 1;
- }
- threw = false;
- } catch (RuntimeException e) {
- throw new RuntimeException("Failed on record " + count, e);
- } finally {
- if (reader instanceof Closeable) {
- Closeables.close((Closeable) reader, threw);
}
}
diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
index 38055e6ac..cfc6b2baa 100644
--- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
@@ -35,4 +35,33 @@ public class CatCommandTest extends ParquetFileTest {
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}
+
+ @Test
+ public void testCatCommandWithMultipleInput() throws IOException {
+ File file = parquetFile();
+ CatCommand command = new CatCommand(createLogger(), 0);
+ command.sourceFiles = Arrays.asList(file.getAbsolutePath(), file.getAbsolutePath());
+ command.setConf(new Configuration());
+ Assert.assertEquals(0, command.run());
+ }
+
+ @Test
+ public void testCatCommandWithSpecificColumns() throws IOException {
+ File file = parquetFile();
+ CatCommand command = new CatCommand(createLogger(), 0);
+ command.sourceFiles = Arrays.asList(file.getAbsolutePath());
+ command.columns = Arrays.asList(INT32_FIELD, INT64_FIELD);
+ command.setConf(new Configuration());
+ Assert.assertEquals(0, command.run());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCatCommandWithInvalidColumn() throws IOException {
+ File file = parquetFile();
+ CatCommand command = new CatCommand(createLogger(), 0);
+ command.sourceFiles = Arrays.asList(file.getAbsolutePath());
+ command.columns = Arrays.asList("invalid_field");
+ command.setConf(new Configuration());
+ command.run();
+ }
}