You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/04/01 09:49:59 UTC
[parquet-mr] branch master updated: PARQUET-1821: Add 'column-size'
command to parquet-cli and parquet-tools (#774)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new d00b2f1 PARQUET-1821: Add 'column-size' command to parquet-cli and parquet-tools (#774)
d00b2f1 is described below
commit d00b2f105f9f732e310ed43c7bfb318213e1ac81
Author: shangxinli <31...@users.noreply.github.com>
AuthorDate: Wed Apr 1 02:49:46 2020 -0700
PARQUET-1821: Add 'column-size' command to parquet-cli and parquet-tools (#774)
---
.../src/main/java/org/apache/parquet/cli/Main.java | 2 +
.../parquet/cli/commands/ColumnSizeCommand.java | 137 +++++++++++++++++++++
.../cli/commands/ColumnSizeCommandTest.java | 91 ++++++++++++++
.../parquet/cli/commands/ParquetFileTest.java | 8 ++
.../parquet/tools/command/ColumnSizeCommand.java | 121 ++++++++++++++++++
.../org/apache/parquet/tools/command/Registry.java | 1 +
.../tools/command/TestColumnSizeCommand.java | 95 ++++++++++++++
7 files changed, 455 insertions(+)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
index fa69ce7..429174b 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.parquet.cli.commands.CSVSchemaCommand;
import org.apache.parquet.cli.commands.CatCommand;
import org.apache.parquet.cli.commands.CheckParquet251Command;
+import org.apache.parquet.cli.commands.ColumnSizeCommand;
import org.apache.parquet.cli.commands.ConvertCSVCommand;
import org.apache.parquet.cli.commands.ConvertCommand;
import org.apache.parquet.cli.commands.ParquetMetadataCommand;
@@ -89,6 +90,7 @@ public class Main extends Configured implements Tool {
jc.addCommand("cat", new CatCommand(console, 0));
jc.addCommand("head", new CatCommand(console, 10));
jc.addCommand("column-index", new ShowColumnIndexCommand(console));
+ jc.addCommand("column-size", new ColumnSizeCommand(console));
}
@Override
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnSizeCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnSizeCommand.java
new file mode 100644
index 0000000..2a59d0b
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ColumnSizeCommand.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.avro.file.SeekableInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Formats;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Parameters(commandDescription="Print the column sizes of a parquet file")
+public class ColumnSizeCommand extends BaseCommand {
+
+ public ColumnSizeCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description = "<parquet path>")
+ String target;
+
+ @Parameter(
+ names = {"-c", "--column", "--columns"},
+ description = "List of columns in the case sensitive dot format to be calculated, " +
+ "for example a.b.c. If an input column is intermediate column, all " +
+ "the child columns will be printed out. If no columns are set, all " +
+ "the columns will be printed out.",
+ required = false)
+ List<String> columns;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(target != null,
+ "A Parquet file is required.");
+
+ Path inputFile = new Path(target);
+ Map<String, Long> columnSizes = getColumnSizeInBytes(inputFile);
+ Map<String, Float> columnRatio = getColumnRatio(columnSizes);
+
+ // If user defined columns, only print out size for those columns
+ if (columns != null && columns.size() > 0) {
+ for (String inputColumn : columns) {
+ long size = 0;
+ float ratio = 0;
+ for (String column : columnSizes.keySet()) {
+ if (column.equals(inputColumn) || column.startsWith(inputColumn + ".")) {
+ size += columnSizes.get(column);
+ ratio += columnRatio.get(column);
+ }
+ }
+ console.info(inputColumn + "->" + " Size In Bytes: " + size + " Size In Ratio: " + ratio);
+ }
+ } else {
+ for (String column : columnSizes.keySet()) {
+ console.info(column + "->" + " Size In Bytes: " + columnSizes.get(column)
+ + " Size In Ratio: " + columnRatio.get(column));
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Print every column size in byte and ratio for a Parquet file",
+ "sample.parquet",
+ "sample.parquet -c col_1",
+ "sample.parquet --column col_2",
+ "sample.parquet --columns col_1 col_2",
+ "sample.parquet --columns col_1 col_2.sub_col_a"
+ );
+ }
+
+ // Make it public to allow some automation tools to call it
+ public Map<String, Long> getColumnSizeInBytes(Path inputFile) throws IOException {
+ Map<String, Long> colSizes = new HashMap<>();
+ ParquetMetadata pmd = ParquetFileReader.readFooter(new Configuration(), inputFile, ParquetMetadataConverter.NO_FILTER);
+
+ for (BlockMetaData block : pmd.getBlocks()) {
+ for (ColumnChunkMetaData column : block.getColumns()) {
+ String colName = column.getPath().toDotString();
+ colSizes.put(colName, column.getTotalSize() + colSizes.getOrDefault(colName, 0L));
+ }
+ }
+
+ return colSizes;
+ }
+
+ // Make it public to allow some automation tools to call it
+ public Map<String, Float> getColumnRatio(Map<String, Long> colSizes) {
+ long totalSize = colSizes.values().stream().reduce(0L, Long::sum);
+ Map<String, Float> colRatio = new HashMap<>();
+
+ for (Map.Entry<String, Long> entry : colSizes.entrySet()) {
+ colRatio.put(entry.getKey(), ((float) entry.getValue()) / ((float) totalSize));
+ }
+
+ return colRatio;
+ }
+}
diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ColumnSizeCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ColumnSizeCommandTest.java
new file mode 100644
index 0000000..b564d6e
--- /dev/null
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ColumnSizeCommandTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnSizeCommandTest extends ParquetFileTest {
+
+ private final int numRecord = 10000;
+ private ColumnSizeCommand command = new ColumnSizeCommand(createLogger());
+ private Configuration conf = new Configuration();
+
+ @Test
+ public void testColumnSizeCommand() throws IOException {
+ File file = parquetFile();
+ ColumnSizeCommand command = new ColumnSizeCommand(createLogger());
+ command.target = file.getAbsolutePath();
+ command.setConf(new Configuration());
+ Assert.assertEquals(0, command.run());
+ }
+
+ @Test
+ public void testColumnSize() throws Exception {
+ String inputFile = createParquetFile();
+ Map<String, Long> columnSizeInBytes = command.getColumnSizeInBytes(new Path(inputFile));
+ assertEquals(columnSizeInBytes.size(), 2);
+ assertTrue(columnSizeInBytes.get("DocId") > columnSizeInBytes.get("Num"));
+ Map<String, Float> columnRatio = command.getColumnRatio(columnSizeInBytes);
+ assertTrue(columnRatio.get("DocId") > columnRatio.get("Num"));
+ }
+
+ private String createParquetFile() throws IOException {
+ MessageType schema = new MessageType("schema",
+ new PrimitiveType(REQUIRED, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, INT32, "Num"));
+
+ conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+ String file = randomParquetFile().getAbsolutePath();
+ ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)).withConf(conf);
+ Random rnd = new Random();
+ try (ParquetWriter writer = builder.build()) {
+ for (int i = 0; i < numRecord; i++) {
+ SimpleGroup g = new SimpleGroup(schema);
+ g.add("DocId", rnd.nextLong());
+ g.add("Num", rnd.nextInt());
+ writer.write(g);
+ }
+ }
+
+ return file;
+ }
+}
diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ParquetFileTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ParquetFileTest.java
index ad6d626..37f27b8 100644
--- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ParquetFileTest.java
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ParquetFileTest.java
@@ -33,6 +33,7 @@ import org.junit.Before;
import java.io.File;
import java.io.IOException;
+import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.parquet.schema.Type.Repetition.*;
@@ -40,6 +41,8 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
public abstract class ParquetFileTest extends FileTest {
+ private Random rnd = new Random();
+
@Before
public void setUp() throws IOException {
createTestParquetFile();
@@ -50,6 +53,11 @@ public abstract class ParquetFileTest extends FileTest {
return new File(tmpDir, getClass().getSimpleName() + ".parquet");
}
+ protected File randomParquetFile() {
+ File tmpDir = getTempFolder();
+ return new File(tmpDir, getClass().getSimpleName() + rnd.nextLong() + ".parquet");
+ }
+
private static MessageType createSchema() {
return new MessageType("schema",
new PrimitiveType(REQUIRED, PrimitiveTypeName.INT32, INT32_FIELD),
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnSizeCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnSizeCommand.java
new file mode 100644
index 0000000..c574ae3
--- /dev/null
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ColumnSizeCommand.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.tools.Main;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ColumnSizeCommand extends ArgsOnlyCommand {
+
+ public static final String[] USAGE = new String[] {
+ "<input>",
+ "where <input> is the parquet file to calculate the column size" +
+ " [<column> ...] are the columns in the case sensitive dot format" +
+ " to be calculated, for example a.b.c. If an input column is intermediate" +
+ " column, all the child columns will be printed out. If no columns are" +
+ " set, all the columns will be printed out."
+ };
+
+ /**
+ * Biggest number of columns we can calculate.
+ */
+ private static final int MAX_COL_NUM = 100;
+
+ public ColumnSizeCommand() {
+ super(1, 1 + MAX_COL_NUM);
+ }
+
+ @Override
+ public String[] getUsageDescription() {
+ return USAGE;
+ }
+
+ @Override
+ public String getCommandDescription() {
+ return "Print out the size in bytes and ratio of column(s) in the input Parquet file";
+ }
+
+ @Override
+ public void execute(CommandLine options) throws Exception {
+ super.execute(options);
+ List<String> args = options.getArgList();
+ Path inputFile = new Path(args.get(0));
+
+ Map<String, Long> columnSizes = getColumnSizeInBytes(inputFile);
+ Map<String, Float> columnRatio = getColumnRatio(columnSizes);
+
+ if (args.size() > 1) {
+ for (String inputColumn : args.subList(1, args.size())) {
+ long size = 0;
+ float ratio = 0;
+ for (String column : columnSizes.keySet()) {
+ if (column.equals(inputColumn) || column.startsWith(inputColumn + ".")) {
+ size += columnSizes.get(column);
+ ratio += columnRatio.get(column);
+ }
+ }
+ Main.out.println(inputColumn + "-> Size In Bytes: " + size + " Size In Ratio: " + ratio);
+ }
+ } else {
+ for (String column : columnSizes.keySet()) {
+ Main.out.println(column + "->" + " Size In Bytes: " + columnSizes.get(column)
+ + " Size In Ratio: " + columnRatio.get(column));
+ }
+ }
+ }
+
+ // Make it public to allow some automation tools to call it
+ public Map<String, Long> getColumnSizeInBytes(Path inputFile) throws IOException {
+ Map<String, Long> colSizes = new HashMap<>();
+ ParquetMetadata pmd = ParquetFileReader.readFooter(new Configuration(), inputFile, ParquetMetadataConverter.NO_FILTER);
+
+ for (BlockMetaData block : pmd.getBlocks()) {
+ for (ColumnChunkMetaData column : block.getColumns()) {
+ String colName = column.getPath().toDotString();
+ colSizes.put(colName, column.getTotalSize() + colSizes.getOrDefault(colName, 0L));
+ }
+ }
+
+ return colSizes;
+ }
+
+ // Make it public to allow some automation tools to call it
+ public Map<String, Float> getColumnRatio(Map<String, Long> colSizes) {
+ long totalSize = colSizes.values().stream().reduce(0L, Long::sum);
+ Map<String, Float> colRatio = new HashMap<>();
+
+ for (Map.Entry<String, Long> entry : colSizes.entrySet()) {
+ colRatio.put(entry.getKey(), ((float) entry.getValue()) / ((float) totalSize));
+ }
+
+ return colRatio;
+ }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
index 6eed4f7..59bb0f6 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java
@@ -36,6 +36,7 @@ public final class Registry {
registry.put("size", SizeCommand.class);
registry.put("column-index", ColumnIndexCommand.class);
registry.put("prune", PruneColumnsCommand.class);
+ registry.put("column-size", ColumnSizeCommand.class);
}
public static Map<String,Command> allCommands() {
diff --git a/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestColumnSizeCommand.java b/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestColumnSizeCommand.java
new file mode 100644
index 0000000..2dd2557
--- /dev/null
+++ b/parquet-tools/src/test/java/org/apache/parquet/tools/command/TestColumnSizeCommand.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.tools.command;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestColumnSizeCommand {
+
+ private final int numRecord = 10000;
+ private ColumnSizeCommand command = new ColumnSizeCommand();
+ private Configuration conf = new Configuration();
+ private Random rnd = new Random();
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testColumnSize() throws Exception {
+ String inputFile = createParquetFile();
+ Map<String, Long> columnSizeInBytes = command.getColumnSizeInBytes(new Path(inputFile));
+ assertEquals(columnSizeInBytes.size(), 2);
+ assertTrue(columnSizeInBytes.get("DocId") > columnSizeInBytes.get("Num"));
+ Map<String, Float> columnRatio = command.getColumnRatio(columnSizeInBytes);
+ assertTrue(columnRatio.get("DocId") > columnRatio.get("Num"));
+ }
+
+ private String createParquetFile() throws IOException {
+ MessageType schema = new MessageType("schema",
+ new PrimitiveType(REQUIRED, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, INT32, "Num"));
+
+ conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+ String file = randomParquetFile().getAbsolutePath();
+ ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file)).withConf(conf);
+ Random rnd = new Random();
+ try (ParquetWriter writer = builder.build()) {
+ for (int i = 0; i < numRecord; i++) {
+ SimpleGroup g = new SimpleGroup(schema);
+ g.add("DocId", rnd.nextLong());
+ g.add("Num", rnd.nextInt());
+ writer.write(g);
+ }
+ }
+
+ return file;
+ }
+
+ private File getTempFolder() {
+ return this.tempFolder.getRoot();
+ }
+
+ private File randomParquetFile() {
+ File tmpDir = getTempFolder();
+ return new File(tmpDir, getClass().getSimpleName() + rnd.nextLong() + ".parquet");
+ }
+}