You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/12/14 10:30:42 UTC
[parquet-mr] branch master updated: PARQUET-1801: Add parquet-tools
'prune' to parquet-cli (#846)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 10df921 PARQUET-1801: Add parquet-tools 'prune' to parquet-cli (#846)
10df921 is described below
commit 10df92187a02180bbc2f7134ba8c83a13063ee3a
Author: Pavi Subenderan <ps...@uwaterloo.ca>
AuthorDate: Mon Dec 14 02:30:30 2020 -0800
PARQUET-1801: Add parquet-tools 'prune' to parquet-cli (#846)
---
.../src/main/java/org/apache/parquet/cli/Main.java | 3 +-
.../parquet/cli/commands/PruneColumnsCommand.java | 81 +++++++++++++++++
.../apache/parquet/hadoop/util/ColumnPruner.java | 54 ++---------
.../parquet/hadoop/util/ColumnPrunerTest.java | 41 ++++-----
.../parquet/tools/command/PruneColumnsCommand.java | 101 +--------------------
5 files changed, 108 insertions(+), 172 deletions(-)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
index f29ff13..1b52a1c 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -33,6 +33,7 @@ import org.apache.parquet.cli.commands.ColumnSizeCommand;
import org.apache.parquet.cli.commands.ConvertCSVCommand;
import org.apache.parquet.cli.commands.ConvertCommand;
import org.apache.parquet.cli.commands.ParquetMetadataCommand;
+import org.apache.parquet.cli.commands.PruneColumnsCommand;
import org.apache.parquet.cli.commands.SchemaCommand;
import org.apache.parquet.cli.commands.ShowColumnIndexCommand;
import org.apache.parquet.cli.commands.ShowDictionaryCommand;
@@ -47,7 +48,6 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Level;
import org.apache.log4j.PropertyConfigurator;
import org.apache.parquet.cli.commands.TransCompressionCommand;
-import org.apache.parquet.hadoop.util.ColumnMasker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
@@ -94,6 +94,7 @@ public class Main extends Configured implements Tool {
jc.addCommand("head", new CatCommand(console, 10));
jc.addCommand("column-index", new ShowColumnIndexCommand(console));
jc.addCommand("column-size", new ColumnSizeCommand(console));
+ jc.addCommand("prune", new PruneColumnsCommand(console));
jc.addCommand("trans-compression", new TransCompressionCommand(console));
jc.addCommand("masking", new ColumnMaskingCommand(console));
}
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
new file mode 100644
index 0000000..a38b68f
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.hadoop.util.ColumnPruner;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+@Parameters(commandDescription="Prune column(s) in a Parquet file and save it to a new file. " +
+ "The columns left are not changed.")
+public class PruneColumnsCommand extends BaseCommand {
+
+ public PruneColumnsCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(
+ names = {"-i", "--input"},
+ description = "<input parquet file path>",
+ required = false)
+ String input;
+
+ @Parameter(
+ names = {"-o", "--output"},
+ description = "<output parquet file path>",
+ required = false)
+ String output;
+
+ @Parameter(
+ names = {"-c", "--columns"},
+ description = "<columns to be replaced with masked value>",
+ required = false)
+ List<String> cols;
+
+ @Override
+ public int run() throws IOException {
+ Preconditions.checkArgument(input != null && output != null,
+ "Both input and output parquet file paths are required.");
+
+ Preconditions.checkArgument(cols != null && cols.size() > 0,
+ "columns cannot be null or empty");
+
+ Path inPath = new Path(input);
+ Path outPath = new Path(output);
+ ColumnPruner columnPruner = new ColumnPruner();
+ columnPruner.pruneColumns(getConf(), inPath, outPath, cols);
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Removes specified columns and write to a new Parquet file",
+ " -i input.parquet -o output.parquet -c col1_name"
+ );
+ }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
similarity index 76%
copy from parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
copy to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
index 2378f37..7ca9588 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
@@ -16,9 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.parquet.tools.command;
+package org.apache.parquet.hadoop.util;
-import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -27,63 +26,24 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-public class PruneColumnsCommand extends ArgsOnlyCommand {
+public class ColumnPruner {
- private static final Logger LOG = LoggerFactory.getLogger(PruneColumnsCommand.class);
-
- public static final String[] USAGE = new String[] {
- "<input> <output> [<column> ...]",
-
- "where <input> is the source parquet file",
- " <output> is the destination parquet file," +
- " [<column> ...] are the columns in the case sensitive dot format" +
- " to be pruned, for example a.b.c"
- };
-
- /**
- * Biggest number of columns we can prune.
- */
- private static final int MAX_COL_NUM = 100;
- private Configuration conf;
-
- public PruneColumnsCommand() {
- super(3, MAX_COL_NUM + 1);
-
- conf = new Configuration();
- }
-
- @Override
- public String[] getUsageDescription() {
- return USAGE;
- }
-
- @Override
- public String getCommandDescription() {
- return "Prune column(s) in a Parquet file and save it to a new file. " +
- "The columns left are not changed.";
- }
-
- @Override
- public void execute(CommandLine options) throws Exception {
- List<String> args = options.getArgList();
- Path inputFile = new Path(args.get(0));
- Path outputFile = new Path(args.get(1));
- List<String> cols = args.subList(2, args.size());
+ private static final Logger LOG = LoggerFactory.getLogger(ColumnPruner.class);
+ public void pruneColumns(Configuration conf, Path inputFile, Path outputFile, List<String> cols) throws IOException {
Set<ColumnPath> prunePaths = convertToColumnPaths(cols);
-
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, inputFile, ParquetMetadataConverter.NO_FILTER);
FileMetaData metaData = pmd.getFileMetaData();
MessageType schema = metaData.getSchema();
@@ -117,7 +77,6 @@ public class PruneColumnsCommand extends ArgsOnlyCommand {
}
private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> prunePaths) {
-
List<Type> fields = schema.getFields();
List<String> currentPath = new ArrayList<>();
List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, prunePaths);
@@ -150,7 +109,7 @@ public class PruneColumnsCommand extends ArgsOnlyCommand {
if (prunedFields.size() > 0) {
prunedField = ((GroupType) field).withNewFields(prunedFields);
}
- }
+ }
}
currentPath.remove(fieldName);
@@ -165,4 +124,3 @@ public class PruneColumnsCommand extends ArgsOnlyCommand {
return prunePaths;
}
}
-
diff --git a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestPruneColumnsCommand.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java
similarity index 89%
rename from parquet-tools/src/test/java/org/apache/parquet/tools/command/TestPruneColumnsCommand.java
rename to parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java
index 764be54..ffe1ffa 100644
--- a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestPruneColumnsCommand.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java
@@ -17,12 +17,8 @@
* under the License.
*/
-package org.apache.parquet.tools.command;
+package org.apache.parquet.hadoop.util;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
@@ -53,10 +49,10 @@ import static org.apache.parquet.schema.Type.Repetition.REPEATED;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
-public class TestPruneColumnsCommand {
+public class ColumnPrunerTest {
private final int numRecord = 1000;
- private PruneColumnsCommand command = new PruneColumnsCommand();
+ private ColumnPruner columnPruner = new ColumnPruner();
private Configuration conf = new Configuration();
@Test
@@ -65,9 +61,9 @@ public class TestPruneColumnsCommand {
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
- // Remove column
- String cargs[] = {inputFile, outputFile, "Gender"};
- executeCommandLine(cargs);
+ // Remove column Gender
+ List<String> cols = Arrays.asList("Gender");
+ columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
@@ -95,7 +91,8 @@ public class TestPruneColumnsCommand {
// Remove columns
String cargs[] = {inputFile, outputFile, "Name", "Gender"};
- executeCommandLine(cargs);
+ List<String> cols = Arrays.asList("Name", "Gender");
+ columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
@@ -119,8 +116,8 @@ public class TestPruneColumnsCommand {
// Create Parquet file
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
- String cargs[] = {inputFile, outputFile, "no_exist"};
- executeCommandLine(cargs);
+ List<String> cols = Arrays.asList("no_exist");
+ columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
}
@Test
@@ -130,8 +127,8 @@ public class TestPruneColumnsCommand {
String outputFile = createTempFile("output");
// Remove nested column
- String cargs[] = {inputFile, outputFile, "Links.Backward"};
- executeCommandLine(cargs);
+ List<String> cols = Arrays.asList("Links.Backward");
+ columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
@@ -158,8 +155,8 @@ public class TestPruneColumnsCommand {
String outputFile = createTempFile("output");
// Remove parent column. All of it's children will be removed.
- String cargs[] = {inputFile, outputFile, "Links"};
- executeCommandLine(cargs);
+ List<String> cols = Arrays.asList("Links");
+ columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
@@ -180,14 +177,8 @@ public class TestPruneColumnsCommand {
// Create Parquet file
String inputFile = createParquetFile("input");
String outputFile = createTempFile("output");
- String cargs[] = {inputFile, outputFile, "Links.Not_exists"};
- executeCommandLine(cargs);
- }
-
- private void executeCommandLine(String[] cargs) throws Exception {
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse(new Options(), cargs, command.supportsExtraArgs());
- command.execute(cmd);
+ List<String> cols = Arrays.asList("Links.Not_exists");
+ columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
}
private void validateColumns(String inputFile, List<String> prunePaths) throws IOException {
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
index 2378f37..548e989 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
@@ -21,28 +21,12 @@ package org.apache.parquet.tools.command;
import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.parquet.hadoop.util.ColumnPruner;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
public class PruneColumnsCommand extends ArgsOnlyCommand {
- private static final Logger LOG = LoggerFactory.getLogger(PruneColumnsCommand.class);
-
public static final String[] USAGE = new String[] {
"<input> <output> [<column> ...]",
@@ -81,88 +65,9 @@ public class PruneColumnsCommand extends ArgsOnlyCommand {
Path inputFile = new Path(args.get(0));
Path outputFile = new Path(args.get(1));
List<String> cols = args.subList(2, args.size());
+ ColumnPruner columnPruner = new ColumnPruner();
- Set<ColumnPath> prunePaths = convertToColumnPaths(cols);
-
- ParquetMetadata pmd = ParquetFileReader.readFooter(conf, inputFile, ParquetMetadataConverter.NO_FILTER);
- FileMetaData metaData = pmd.getFileMetaData();
- MessageType schema = metaData.getSchema();
- List<String> paths = new ArrayList<>();
- getPaths(schema, paths, null);
-
- for (String col : cols) {
- if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inputFile.getName());
- }
- }
-
- ParquetFileWriter writer = new ParquetFileWriter(conf,
- pruneColumnsInSchema(schema, prunePaths), outputFile, ParquetFileWriter.Mode.CREATE);
-
- writer.start();
- writer.appendFile(HadoopInputFile.fromPath(inputFile, conf));
- writer.end(metaData.getKeyValueMetaData());
- }
-
- // We have to rewrite getPaths because MessageType only get level 0 paths
- private void getPaths(GroupType schema, List<String> paths, String parent) {
- List<Type> fields = schema.getFields();
- String prefix = (parent == null) ? "" : parent + ".";
- for (Type field : fields) {
- paths.add(prefix + field.getName());
- if (field instanceof GroupType) {
- getPaths(field.asGroupType(), paths, prefix + field.getName());
- }
- }
- }
-
- private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> prunePaths) {
-
- List<Type> fields = schema.getFields();
- List<String> currentPath = new ArrayList<>();
- List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, prunePaths);
- MessageType newSchema = new MessageType(schema.getName(), prunedFields);
- return newSchema;
- }
-
- private List<Type> pruneColumnsInFields(List<Type> fields, List<String> currentPath, Set<ColumnPath> prunePaths) {
- List<Type> prunedFields = new ArrayList<>();
- for (Type childField : fields) {
- Type prunedChildField = pruneColumnsInField(childField, currentPath, prunePaths);
- if (prunedChildField != null) {
- prunedFields.add(prunedChildField);
- }
- }
- return prunedFields;
- }
-
- private Type pruneColumnsInField(Type field, List<String> currentPath, Set<ColumnPath> prunePaths) {
- String fieldName = field.getName();
- currentPath.add(fieldName);
- ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
- Type prunedField = null;
- if (!prunePaths.contains(path)) {
- if (field.isPrimitive()) {
- prunedField = field;
- } else {
- List<Type> childFields = ((GroupType) field).getFields();
- List<Type> prunedFields = pruneColumnsInFields(childFields, currentPath, prunePaths);
- if (prunedFields.size() > 0) {
- prunedField = ((GroupType) field).withNewFields(prunedFields);
- }
- }
- }
-
- currentPath.remove(fieldName);
- return prunedField;
- }
-
- private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
- Set<ColumnPath> prunePaths = new HashSet<>();
- for (String col : cols) {
- prunePaths.add(ColumnPath.fromDotString(col));
- }
- return prunePaths;
+ columnPruner.pruneColumns(conf, inputFile, outputFile, cols);
}
}