You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/12/14 10:30:42 UTC

[parquet-mr] branch master updated: PARQUET-1801: Add parquet-tools 'prune' to parquet-cli (#846)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 10df921  PARQUET-1801: Add parquet-tools 'prune' to parquet-cli (#846)
10df921 is described below

commit 10df92187a02180bbc2f7134ba8c83a13063ee3a
Author: Pavi Subenderan <ps...@uwaterloo.ca>
AuthorDate: Mon Dec 14 02:30:30 2020 -0800

    PARQUET-1801: Add parquet-tools 'prune' to parquet-cli (#846)
---
 .../src/main/java/org/apache/parquet/cli/Main.java |   3 +-
 .../parquet/cli/commands/PruneColumnsCommand.java  |  81 +++++++++++++++++
 .../apache/parquet/hadoop/util/ColumnPruner.java   |  54 ++---------
 .../parquet/hadoop/util/ColumnPrunerTest.java      |  41 ++++-----
 .../parquet/tools/command/PruneColumnsCommand.java | 101 +--------------------
 5 files changed, 108 insertions(+), 172 deletions(-)

diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
index f29ff13..1b52a1c 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -33,6 +33,7 @@ import org.apache.parquet.cli.commands.ColumnSizeCommand;
 import org.apache.parquet.cli.commands.ConvertCSVCommand;
 import org.apache.parquet.cli.commands.ConvertCommand;
 import org.apache.parquet.cli.commands.ParquetMetadataCommand;
+import org.apache.parquet.cli.commands.PruneColumnsCommand;
 import org.apache.parquet.cli.commands.SchemaCommand;
 import org.apache.parquet.cli.commands.ShowColumnIndexCommand;
 import org.apache.parquet.cli.commands.ShowDictionaryCommand;
@@ -47,7 +48,6 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Level;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.parquet.cli.commands.TransCompressionCommand;
-import org.apache.parquet.hadoop.util.ColumnMasker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Set;
@@ -94,6 +94,7 @@ public class Main extends Configured implements Tool {
     jc.addCommand("head", new CatCommand(console, 10));
     jc.addCommand("column-index", new ShowColumnIndexCommand(console));
     jc.addCommand("column-size", new ColumnSizeCommand(console));
+    jc.addCommand("prune", new PruneColumnsCommand(console));
     jc.addCommand("trans-compression", new TransCompressionCommand(console));
     jc.addCommand("masking", new ColumnMaskingCommand(console));
   }
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
new file mode 100644
index 0000000..a38b68f
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/PruneColumnsCommand.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.hadoop.util.ColumnPruner;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+@Parameters(commandDescription="Prune column(s) in a Parquet file and save it to a new file. " +
+  "The columns left are not changed.")
+public class PruneColumnsCommand extends BaseCommand {
+
+  public PruneColumnsCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(
+    names = {"-i", "--input"},
+    description = "<input parquet file path>",
+    required = false)
+  String input;
+
+  @Parameter(
+    names = {"-o", "--output"},
+    description = "<output parquet file path>",
+    required = false)
+  String output;
+
+  @Parameter(
+    names = {"-c", "--columns"},
+    description = "<columns to be replaced with masked value>",
+    required = false)
+  List<String> cols;
+
+  @Override
+  public int run() throws IOException {
+    Preconditions.checkArgument(input != null && output != null,
+      "Both input and output parquet file paths are required.");
+
+    Preconditions.checkArgument(cols != null && cols.size() > 0,
+      "columns cannot be null or empty");
+
+    Path inPath = new Path(input);
+    Path outPath = new Path(output);
+    ColumnPruner columnPruner = new ColumnPruner();
+    columnPruner.pruneColumns(getConf(), inPath, outPath, cols);
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+      "# Removes specified columns and write to a new Parquet file",
+      " -i input.parquet -o output.parquet -c col1_name"
+    );
+  }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
similarity index 76%
copy from parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
copy to parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
index 2378f37..7ca9588 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
@@ -16,9 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.parquet.tools.command;
+package org.apache.parquet.hadoop.util;
 
-import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -27,63 +26,24 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-public class PruneColumnsCommand extends ArgsOnlyCommand {
+public class ColumnPruner {
 
-  private static final Logger LOG = LoggerFactory.getLogger(PruneColumnsCommand.class);
-
-  public static final String[] USAGE = new String[] {
-    "<input> <output> [<column> ...]",
-
-    "where <input> is the source parquet file",
-    "    <output> is the destination parquet file," +
-    "    [<column> ...] are the columns in the case sensitive dot format" +
-      " to be pruned, for example a.b.c"
-  };
-
-  /**
-   * Biggest number of columns we can prune.
-   */
-  private static final int MAX_COL_NUM = 100;
-  private Configuration conf;
-
-  public PruneColumnsCommand() {
-    super(3, MAX_COL_NUM + 1);
-
-    conf = new Configuration();
-  }
-
-  @Override
-  public String[] getUsageDescription() {
-    return USAGE;
-  }
-
-  @Override
-  public String getCommandDescription() {
-    return "Prune column(s) in a Parquet file and save it to a new file. " +
-      "The columns left are not changed.";
-  }
-
-  @Override
-  public void execute(CommandLine options) throws Exception {
-    List<String> args = options.getArgList();
-    Path inputFile = new Path(args.get(0));
-    Path outputFile = new Path(args.get(1));
-    List<String> cols = args.subList(2, args.size());
+  private static final Logger LOG = LoggerFactory.getLogger(ColumnPruner.class);
 
+  public void pruneColumns(Configuration conf, Path inputFile, Path outputFile, List<String> cols) throws IOException {
     Set<ColumnPath> prunePaths = convertToColumnPaths(cols);
-
     ParquetMetadata pmd = ParquetFileReader.readFooter(conf, inputFile, ParquetMetadataConverter.NO_FILTER);
     FileMetaData metaData = pmd.getFileMetaData();
     MessageType schema = metaData.getSchema();
@@ -117,7 +77,6 @@ public class PruneColumnsCommand extends ArgsOnlyCommand {
   }
 
   private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> prunePaths) {
-
     List<Type> fields = schema.getFields();
     List<String> currentPath = new ArrayList<>();
     List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, prunePaths);
@@ -150,7 +109,7 @@ public class PruneColumnsCommand extends ArgsOnlyCommand {
         if (prunedFields.size() > 0) {
           prunedField = ((GroupType) field).withNewFields(prunedFields);
         }
-      } 
+      }
     }
 
     currentPath.remove(fieldName);
@@ -165,4 +124,3 @@ public class PruneColumnsCommand extends ArgsOnlyCommand {
     return prunePaths;
   }
 }
-
diff --git a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestPruneColumnsCommand.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java
similarity index 89%
rename from parquet-tools/src/test/java/org/apache/parquet/tools/command/TestPruneColumnsCommand.java
rename to parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java
index 764be54..ffe1ffa 100644
--- a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestPruneColumnsCommand.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnPrunerTest.java
@@ -17,12 +17,8 @@
  * under the License.
  */
 
-package org.apache.parquet.tools.command;
+package org.apache.parquet.hadoop.util;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.example.data.Group;
@@ -53,10 +49,10 @@ import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertEquals;
 
-public class TestPruneColumnsCommand {
+public class ColumnPrunerTest {
 
   private final int numRecord = 1000;
-  private PruneColumnsCommand command = new PruneColumnsCommand();
+  private ColumnPruner columnPruner = new ColumnPruner();
   private Configuration conf = new Configuration();
 
   @Test
@@ -65,9 +61,9 @@ public class TestPruneColumnsCommand {
     String inputFile = createParquetFile("input");
     String outputFile = createTempFile("output");
 
-    // Remove column
-    String cargs[] = {inputFile, outputFile, "Gender"};
-    executeCommandLine(cargs);
+    // Remove column Gender
+    List<String> cols = Arrays.asList("Gender");
+    columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
 
     // Verify the schema are not changed for the columns not pruned
     ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
@@ -95,7 +91,8 @@ public class TestPruneColumnsCommand {
 
     // Remove columns
     String cargs[] = {inputFile, outputFile, "Name", "Gender"};
-    executeCommandLine(cargs);
+    List<String> cols = Arrays.asList("Name", "Gender");
+    columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
 
     // Verify the schema are not changed for the columns not pruned
     ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
@@ -119,8 +116,8 @@ public class TestPruneColumnsCommand {
     // Create Parquet file
     String inputFile = createParquetFile("input");
     String outputFile = createTempFile("output");
-    String cargs[] = {inputFile, outputFile, "no_exist"};
-    executeCommandLine(cargs);
+    List<String> cols = Arrays.asList("no_exist");
+    columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
   }
 
   @Test
@@ -130,8 +127,8 @@ public class TestPruneColumnsCommand {
     String outputFile = createTempFile("output");
 
     // Remove nested column
-    String cargs[] = {inputFile, outputFile, "Links.Backward"};
-    executeCommandLine(cargs);
+    List<String> cols = Arrays.asList("Links.Backward");
+    columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
 
     // Verify the schema are not changed for the columns not pruned
     ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
@@ -158,8 +155,8 @@ public class TestPruneColumnsCommand {
     String outputFile = createTempFile("output");
 
     // Remove parent column. All of it's children will be removed.
-    String cargs[] = {inputFile, outputFile, "Links"};
-    executeCommandLine(cargs);
+    List<String> cols = Arrays.asList("Links");
+    columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
 
     // Verify the schema are not changed for the columns not pruned
     ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
@@ -180,14 +177,8 @@ public class TestPruneColumnsCommand {
     // Create Parquet file
     String inputFile = createParquetFile("input");
     String outputFile = createTempFile("output");
-    String cargs[] = {inputFile, outputFile, "Links.Not_exists"};
-    executeCommandLine(cargs);
-  }
-
-  private void executeCommandLine(String[] cargs) throws Exception {
-    CommandLineParser parser = new PosixParser();
-    CommandLine cmd = parser.parse(new Options(), cargs, command.supportsExtraArgs());
-    command.execute(cmd);
+    List<String> cols = Arrays.asList("Links.Not_exists");
+    columnPruner.pruneColumns(conf, new Path(inputFile), new Path(outputFile), cols);
   }
 
   private void validateColumns(String inputFile, List<String> prunePaths) throws IOException {
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
index 2378f37..548e989 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/PruneColumnsCommand.java
@@ -21,28 +21,12 @@ package org.apache.parquet.tools.command;
 import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.parquet.hadoop.util.ColumnPruner;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 public class PruneColumnsCommand extends ArgsOnlyCommand {
 
-  private static final Logger LOG = LoggerFactory.getLogger(PruneColumnsCommand.class);
-
   public static final String[] USAGE = new String[] {
     "<input> <output> [<column> ...]",
 
@@ -81,88 +65,9 @@ public class PruneColumnsCommand extends ArgsOnlyCommand {
     Path inputFile = new Path(args.get(0));
     Path outputFile = new Path(args.get(1));
     List<String> cols = args.subList(2, args.size());
+    ColumnPruner columnPruner = new ColumnPruner();
 
-    Set<ColumnPath> prunePaths = convertToColumnPaths(cols);
-
-    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, inputFile, ParquetMetadataConverter.NO_FILTER);
-    FileMetaData metaData = pmd.getFileMetaData();
-    MessageType schema = metaData.getSchema();
-    List<String> paths = new ArrayList<>();
-    getPaths(schema, paths, null);
-
-    for (String col : cols) {
-      if (!paths.contains(col)) {
-        LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inputFile.getName());
-      }
-    }
-
-    ParquetFileWriter writer = new ParquetFileWriter(conf,
-      pruneColumnsInSchema(schema, prunePaths), outputFile, ParquetFileWriter.Mode.CREATE);
-
-    writer.start();
-    writer.appendFile(HadoopInputFile.fromPath(inputFile, conf));
-    writer.end(metaData.getKeyValueMetaData());
-  }
-
-  // We have to rewrite getPaths because MessageType only get level 0 paths
-  private void getPaths(GroupType schema, List<String> paths, String parent) {
-    List<Type> fields = schema.getFields();
-    String prefix = (parent == null) ? "" : parent + ".";
-    for (Type field : fields) {
-      paths.add(prefix + field.getName());
-      if (field instanceof GroupType) {
-        getPaths(field.asGroupType(), paths, prefix + field.getName());
-      }
-    }
-  }
-
-  private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> prunePaths) {
-
-    List<Type> fields = schema.getFields();
-    List<String> currentPath = new ArrayList<>();
-    List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, prunePaths);
-    MessageType newSchema = new MessageType(schema.getName(), prunedFields);
-    return newSchema;
-  }
-
-  private List<Type> pruneColumnsInFields(List<Type> fields, List<String> currentPath, Set<ColumnPath> prunePaths) {
-    List<Type> prunedFields = new ArrayList<>();
-    for (Type childField : fields) {
-      Type prunedChildField = pruneColumnsInField(childField, currentPath, prunePaths);
-      if (prunedChildField != null) {
-        prunedFields.add(prunedChildField);
-      }
-    }
-    return prunedFields;
-  }
-
-  private Type pruneColumnsInField(Type field, List<String> currentPath, Set<ColumnPath> prunePaths) {
-    String fieldName = field.getName();
-    currentPath.add(fieldName);
-    ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
-    Type prunedField = null;
-    if (!prunePaths.contains(path)) {
-      if (field.isPrimitive()) {
-        prunedField = field;
-      } else {
-        List<Type> childFields = ((GroupType) field).getFields();
-        List<Type> prunedFields = pruneColumnsInFields(childFields, currentPath, prunePaths);
-        if (prunedFields.size() > 0) {
-          prunedField = ((GroupType) field).withNewFields(prunedFields);
-        }
-      } 
-    }
-
-    currentPath.remove(fieldName);
-    return prunedField;
-  }
-
-  private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
-    Set<ColumnPath> prunePaths = new HashSet<>();
-    for (String col : cols) {
-      prunePaths.add(ColumnPath.fromDotString(col));
-    }
-    return prunePaths;
+    columnPruner.pruneColumns(conf, inputFile, outputFile, cols);
   }
 }