You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/04/01 09:49:59 UTC

[parquet-mr] branch master updated: PARQUET-1821: Add 'column-size' command to parquet-cli and parquet-tools (#774)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new d00b2f1  PARQUET-1821: Add 'column-size' command to parquet-cli and parquet-tools (#774)
d00b2f1 is described below

commit d00b2f105f9f732e310ed43c7bfb318213e1ac81
Author: shangxinli <31...@users.noreply.github.com>
AuthorDate: Wed Apr 1 02:49:46 2020 -0700

    PARQUET-1821: Add 'column-size' command to parquet-cli and parquet-tools (#774)
---
 .../src/main/java/org/apache/parquet/cli/Main.java |   2 +
 .../parquet/cli/commands/ColumnSizeCommand.java    | 137 +++++++++++++++++++++
 .../cli/commands/ColumnSizeCommandTest.java        |  91 ++++++++++++++
 .../parquet/cli/commands/ParquetFileTest.java      |   8 ++
 .../parquet/tools/command/ColumnSizeCommand.java   | 121 ++++++++++++++++++
 .../org/apache/parquet/tools/command/Registry.java |   1 +
 .../tools/command/TestColumnSizeCommand.java       |  95 ++++++++++++++
 7 files changed, 455 insertions(+)

diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
index fa69ce7..429174b 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.parquet.cli.commands.CSVSchemaCommand;
 import org.apache.parquet.cli.commands.CatCommand;
 import org.apache.parquet.cli.commands.CheckParquet251Command;
+import org.apache.parquet.cli.commands.ColumnSizeCommand;
 import org.apache.parquet.cli.commands.ConvertCSVCommand;
 import org.apache.parquet.cli.commands.ConvertCommand;
 import org.apache.parquet.cli.commands.ParquetMetadataCommand;
@@ -89,6 +90,7 @@ public class Main extends Configured implements Tool {
     jc.addCommand("cat", new CatCommand(console, 0));
     jc.addCommand("head", new CatCommand(console, 10));
     jc.addCommand("column-index", new ShowColumnIndexCommand(console));
+    jc.addCommand("column-size", new ColumnSizeCommand(console));
   }
 
   @Override
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnSizeCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnSizeCommand.java
new file mode 100644
index 0000000..2a59d0b
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnSizeCommand.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.avro.file.SeekableInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Formats;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Parameters(commandDescription="Print the column sizes of a parquet file")
+public class ColumnSizeCommand extends BaseCommand {
+
+  public ColumnSizeCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<parquet path>")
+  String target;
+
+  @Parameter(
+    names = {"-c", "--column", "--columns"},
+    description = "List of columns in the case sensitive dot format to be calculated, " +
+                  "for example a.b.c. If an input column is intermediate column, all " +
+                  "the child columns will be printed out. If no columns are set, all " +
+                  "the columns will be printed out.",
+    required = false)
+  List<String> columns;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(target != null,
+      "A Parquet file is required.");
+
+    Path inputFile = new Path(target);
+    Map<String, Long> columnSizes = getColumnSizeInBytes(inputFile);
+    Map<String, Float> columnRatio = getColumnRatio(columnSizes);
+
+    // If user defined columns, only print out size for those columns
+    if (columns != null && columns.size() > 0) {
+      for (String inputColumn : columns) {
+        long size = 0;
+        float ratio = 0;
+        for (String column : columnSizes.keySet()) {
+          if (column.equals(inputColumn) || column.startsWith(inputColumn + ".")) {
+            size += columnSizes.get(column);
+            ratio += columnRatio.get(column);
+          }
+        }
+        console.info(inputColumn + "->" + " Size In Bytes: " + size + " Size In Ratio: " + ratio);
+      }
+    } else {
+      for (String column : columnSizes.keySet()) {
+        console.info(column + "->" + " Size In Bytes: " + columnSizes.get(column)
+          + " Size In Ratio: " + columnRatio.get(column));
+      }
+    }
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Print every column size in byte and ratio for a Parquet file",
+        "sample.parquet",
+        "sample.parquet -c col_1",
+        "sample.parquet --column col_2",
+        "sample.parquet --columns col_1 col_2",
+        "sample.parquet --columns col_1 col_2.sub_col_a"
+    );
+  }
+
+  // Make it public to allow some automation tools to call it
+  public Map<String, Long> getColumnSizeInBytes(Path inputFile) throws IOException {
+    Map<String, Long> colSizes = new HashMap<>();
+    ParquetMetadata pmd = ParquetFileReader.readFooter(new Configuration(), inputFile, ParquetMetadataConverter.NO_FILTER);
+
+    for (BlockMetaData block : pmd.getBlocks()) {
+      for (ColumnChunkMetaData column : block.getColumns()) {
+        String colName = column.getPath().toDotString();
+        colSizes.put(colName, column.getTotalSize() + colSizes.getOrDefault(colName, 0L));
+      }
+    }
+
+    return colSizes;
+  }
+
+  // Make it public to allow some automation tools to call it
+  public Map<String, Float> getColumnRatio(Map<String, Long> colSizes) {
+    long totalSize = colSizes.values().stream().reduce(0L, Long::sum);
+    Map<String, Float> colRatio = new HashMap<>();
+
+    for (Map.Entry<String, Long> entry : colSizes.entrySet()) {
+      colRatio.put(entry.getKey(), ((float) entry.getValue()) / ((float) totalSize));
+    }
+
+    return colRatio;
+  }
+}
diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ColumnSizeCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ColumnSizeCommandTest.java
new file mode 100644
index 0000000..b564d6e
--- /dev/null
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ColumnSizeCommandTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnSizeCommandTest extends ParquetFileTest {
+
+  private final int numRecord = 10000;
+  private ColumnSizeCommand command = new ColumnSizeCommand(createLogger());
+  private Configuration conf = new Configuration();
+
+  @Test
+  public void testColumnSizeCommand() throws IOException {
+    File file = parquetFile();
+    ColumnSizeCommand command = new ColumnSizeCommand(createLogger());
+    command.target = file.getAbsolutePath();
+    command.setConf(new Configuration());
+    Assert.assertEquals(0, command.run());
+  }
+
+  @Test
+  public void testColumnSize() throws Exception {
+    String inputFile = createParquetFile();
+    Map<String, Long> columnSizeInBytes = command.getColumnSizeInBytes(new Path(inputFile));
+    assertEquals(columnSizeInBytes.size(), 2);
+    assertTrue(columnSizeInBytes.get("DocId") > columnSizeInBytes.get("Num"));
+    Map<String, Float> columnRatio = command.getColumnRatio(columnSizeInBytes);
+    assertTrue(columnRatio.get("DocId") > columnRatio.get("Num"));
+  }
+
+  private String createParquetFile() throws IOException {
+    MessageType schema = new MessageType("schema",
+      new PrimitiveType(REQUIRED, INT64, "DocId"),
+      new PrimitiveType(REQUIRED, INT32, "Num"));
+
+    conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+    String file = randomParquetFile().getAbsolutePath();
+    ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)).withConf(conf);
+    Random rnd = new Random();
+    try (ParquetWriter writer = builder.build()) {
+      for (int i = 0; i < numRecord; i++) {
+        SimpleGroup g = new SimpleGroup(schema);
+        g.add("DocId", rnd.nextLong());
+        g.add("Num", rnd.nextInt());
+        writer.write(g);
+      }
+    }
+
+    return file;
+  }
+}
diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ParquetFileTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ParquetFileTest.java
index ad6d626..37f27b8 100644
--- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ParquetFileTest.java
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ParquetFileTest.java
@@ -33,6 +33,7 @@ import org.junit.Before;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.parquet.schema.Type.Repetition.*;
@@ -40,6 +41,8 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 public abstract class ParquetFileTest extends FileTest {
 
+  private Random rnd = new Random();
+
   @Before
   public void setUp() throws IOException {
     createTestParquetFile();
@@ -50,6 +53,11 @@ public abstract class ParquetFileTest extends FileTest {
     return new File(tmpDir, getClass().getSimpleName() + ".parquet");
   }
 
+  protected File randomParquetFile() {
+    File tmpDir = getTempFolder();
+    return new File(tmpDir, getClass().getSimpleName() + rnd.nextLong() + ".parquet");
+  }
+
   private static MessageType createSchema() {
     return new MessageType("schema",
       new PrimitiveType(REQUIRED, PrimitiveTypeName.INT32, INT32_FIELD),
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnSizeCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnSizeCommand.java
new file mode 100644
index 0000000..c574ae3
--- /dev/null
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnSizeCommand.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.tools.Main;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ColumnSizeCommand extends ArgsOnlyCommand {
+
+  public static final String[] USAGE = new String[] {
+    "<input>",
+    "where <input> is the parquet file to calculate the column size" +
+    "     [<column> ...] are the columns in the case sensitive dot format" +
+    "     to be calculated, for example a.b.c. If an input column is intermediate" +
+    "     column, all the child columns will be printed out. If no columns are" +
+    "     set, all the columns will be printed out."
+  };
+
+  /**
+   * Biggest number of columns we can calculate.
+   */
+  private static final int MAX_COL_NUM = 100;
+
+  public ColumnSizeCommand() {
+    super(1, 1 + MAX_COL_NUM);
+  }
+
+  @Override
+  public String[] getUsageDescription() {
+    return USAGE;
+  }
+
+  @Override
+  public String getCommandDescription() {
+    return "Print out the size in bytes and ratio of column(s) in the input Parquet file";
+  }
+
+  @Override
+  public void execute(CommandLine options) throws Exception {
+    super.execute(options);
+    List<String> args = options.getArgList();
+    Path inputFile = new Path(args.get(0));
+
+    Map<String, Long> columnSizes = getColumnSizeInBytes(inputFile);
+    Map<String, Float> columnRatio = getColumnRatio(columnSizes);
+
+    if (args.size() > 1) {
+      for (String inputColumn : args.subList(1, args.size())) {
+        long size = 0;
+        float ratio = 0;
+        for (String column : columnSizes.keySet()) {
+          if (column.equals(inputColumn) || column.startsWith(inputColumn + ".")) {
+            size += columnSizes.get(column);
+            ratio += columnRatio.get(column);
+          }
+        }
+        Main.out.println(inputColumn + "-> Size In Bytes: " + size + " Size In Ratio: " + ratio);
+      }
+    } else {
+      for (String column : columnSizes.keySet()) {
+        Main.out.println(column + "->" + " Size In Bytes: " + columnSizes.get(column)
+          + " Size In Ratio: " + columnRatio.get(column));
+      }
+    }
+  }
+
+  // Make it public to allow some automation tools to call it
+  public Map<String, Long> getColumnSizeInBytes(Path inputFile) throws IOException {
+    Map<String, Long> colSizes = new HashMap<>();
+    ParquetMetadata pmd = ParquetFileReader.readFooter(new Configuration(), inputFile, ParquetMetadataConverter.NO_FILTER);
+
+    for (BlockMetaData block : pmd.getBlocks()) {
+      for (ColumnChunkMetaData column : block.getColumns()) {
+        String colName = column.getPath().toDotString();
+        colSizes.put(colName, column.getTotalSize() + colSizes.getOrDefault(colName, 0L));
+      }
+    }
+
+    return colSizes;
+  }
+
+  // Make it public to allow some automation tools to call it
+  public Map<String, Float> getColumnRatio(Map<String, Long> colSizes) {
+    long totalSize = colSizes.values().stream().reduce(0L, Long::sum);
+    Map<String, Float> colRatio = new HashMap<>();
+
+    for (Map.Entry<String, Long> entry : colSizes.entrySet()) {
+      colRatio.put(entry.getKey(), ((float) entry.getValue()) / ((float) totalSize));
+    }
+
+    return colRatio;
+  }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
index 6eed4f7..59bb0f6 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
@@ -36,6 +36,7 @@ public final class Registry {
     registry.put("size", SizeCommand.class);
     registry.put("column-index", ColumnIndexCommand.class);
     registry.put("prune", PruneColumnsCommand.class);
+    registry.put("column-size", ColumnSizeCommand.class);
   }
 
   public static Map<String,Command> allCommands() {
diff --git a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestColumnSizeCommand.java b/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestColumnSizeCommand.java
new file mode 100644
index 0000000..2dd2557
--- /dev/null
+++ b/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestColumnSizeCommand.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.tools.command;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestColumnSizeCommand {
+
+  private final int numRecord = 10000;
+  private ColumnSizeCommand command = new ColumnSizeCommand();
+  private Configuration conf = new Configuration();
+  private Random rnd = new Random();
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Test
+  public void testColumnSize() throws Exception {
+    String inputFile = createParquetFile();
+    Map<String, Long> columnSizeInBytes = command.getColumnSizeInBytes(new Path(inputFile));
+    assertEquals(columnSizeInBytes.size(), 2);
+    assertTrue(columnSizeInBytes.get("DocId") > columnSizeInBytes.get("Num"));
+    Map<String, Float> columnRatio = command.getColumnRatio(columnSizeInBytes);
+    assertTrue(columnRatio.get("DocId") > columnRatio.get("Num"));
+  }
+
+  private String createParquetFile() throws IOException {
+    MessageType schema = new MessageType("schema",
+      new PrimitiveType(REQUIRED, INT64, "DocId"),
+      new PrimitiveType(REQUIRED, INT32, "Num"));
+
+    conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+    String file = randomParquetFile().getAbsolutePath();
+    ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)).withConf(conf);
+    Random rnd = new Random();
+    try (ParquetWriter writer = builder.build()) {
+      for (int i = 0; i < numRecord; i++) {
+        SimpleGroup g = new SimpleGroup(schema);
+        g.add("DocId", rnd.nextLong());
+        g.add("Num", rnd.nextInt());
+        writer.write(g);
+      }
+    }
+
+    return file;
+  }
+
+  private File getTempFolder() {
+    return this.tempFolder.getRoot();
+  }
+
+  private File randomParquetFile() {
+    File tmpDir = getTempFolder();
+    return new File(tmpDir, getClass().getSimpleName() + rnd.nextLong() + ".parquet");
+  }
+}