You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2017/07/28 23:25:29 UTC
[4/4] parquet-mr git commit: PARQUET-777: Add Parquet CLI.
PARQUET-777: Add Parquet CLI.
This adds a new parquet-cli module with an improved command-line tool. The parquet-cli/README.md file has instructions for building and testing locally.
Author: Ryan Blue <bl...@apache.org>
Author: Tom White <to...@cloudera.com>
Closes #384 from rdblue/PARQUET-777-add-parquet-cli and squashes the following commits:
de49eff [Ryan Blue] PARQUET-777: Move dynamic support classes, add tests.
affdfb9 [Ryan Blue] PARQUET-777: Update for review feedback.
f953fd4 [Ryan Blue] PARQUET-777: Update README.md with better instructions.
aed223d [Tom White] Replace source file headers with Apache header.
d718363 [Ryan Blue] PARQUET-777: Add Parquet CLI.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/ddbeb4dd
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/ddbeb4dd
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/ddbeb4dd
Branch: refs/heads/master
Commit: ddbeb4dd17d9c219b99b1e66d8be28efe37e3aa6
Parents: df9f8d8
Author: Ryan Blue <bl...@apache.org>
Authored: Fri Jul 28 16:25:21 2017 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Fri Jul 28 16:25:21 2017 -0700
----------------------------------------------------------------------
NOTICE | 38 ++
parquet-cli/README.md | 107 ++++
parquet-cli/pom.xml | 153 +++++
.../org/apache/parquet/cli/BaseCommand.java | 397 ++++++++++++
.../java/org/apache/parquet/cli/Command.java | 40 ++
.../cli/HadoopFileSystemURLStreamHandler.java | 79 +++
.../main/java/org/apache/parquet/cli/Help.java | 147 +++++
.../main/java/org/apache/parquet/cli/Main.java | 178 ++++++
.../main/java/org/apache/parquet/cli/Util.java | 335 ++++++++++
.../parquet/cli/commands/CSVSchemaCommand.java | 131 ++++
.../apache/parquet/cli/commands/CatCommand.java | 106 ++++
.../cli/commands/CheckParquet251Command.java | 351 ++++++++++
.../parquet/cli/commands/ConvertCSVCommand.java | 204 ++++++
.../parquet/cli/commands/ConvertCommand.java | 165 +++++
.../cli/commands/ParquetMetadataCommand.java | 180 ++++++
.../parquet/cli/commands/SchemaCommand.java | 138 ++++
.../cli/commands/ShowDictionaryCommand.java | 131 ++++
.../parquet/cli/commands/ShowPagesCommand.java | 217 +++++++
.../parquet/cli/commands/ToAvroCommand.java | 141 ++++
.../org/apache/parquet/cli/csv/AvroCSV.java | 258 ++++++++
.../apache/parquet/cli/csv/AvroCSVReader.java | 121 ++++
.../apache/parquet/cli/csv/CSVProperties.java | 111 ++++
.../apache/parquet/cli/csv/RecordBuilder.java | 200 ++++++
.../org/apache/parquet/cli/json/AvroJson.java | 636 +++++++++++++++++++
.../apache/parquet/cli/json/AvroJsonReader.java | 85 +++
.../org/apache/parquet/cli/util/Codecs.java | 50 ++
.../apache/parquet/cli/util/Expressions.java | 391 ++++++++++++
.../org/apache/parquet/cli/util/Formats.java | 47 ++
.../apache/parquet/cli/util/GetClassLoader.java | 39 ++
.../parquet/cli/util/RecordException.java | 53 ++
.../parquet/cli/util/RuntimeIOException.java | 31 +
.../org/apache/parquet/cli/util/Schemas.java | 498 +++++++++++++++
.../cli/util/SeekableFSDataInputStream.java | 76 +++
parquet-cli/src/main/resources/META-INF/LICENSE | 348 ++++++++++
parquet-cli/src/main/resources/META-INF/NOTICE | 45 ++
.../src/main/resources/cli-logging.properties | 51 ++
.../java/org/apache/parquet/Exceptions.java | 34 +
.../apache/parquet/util/DynConstructors.java | 273 ++++++++
.../org/apache/parquet/util/DynMethods.java | 520 +++++++++++++++
.../test/java/org/apache/parquet/TestUtils.java | 70 ++
.../org/apache/parquet/util/Concatenator.java | 82 +++
.../parquet/util/TestDynConstructors.java | 235 +++++++
.../org/apache/parquet/util/TestDynMethods.java | 410 ++++++++++++
pom.xml | 7 +
44 files changed, 7909 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index a9b6c56..289b092 100644
--- a/NOTICE
+++ b/NOTICE
@@ -54,3 +54,41 @@ its NOTICE file:
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+--------------------------------------------------------------------------------
+
+This project includes code from Kite, developed at Cloudera, Inc. with
+the following copyright notice:
+
+| Copyright 2013 Cloudera Inc.
+|
+| Licensed under the Apache License, Version 2.0 (the "License");
+| you may not use this file except in compliance with the License.
+| You may obtain a copy of the License at
+|
+| http://www.apache.org/licenses/LICENSE-2.0
+|
+| Unless required by applicable law or agreed to in writing, software
+| distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and
+| limitations under the License.
+
+--------------------------------------------------------------------------------
+
+This project includes code from Netflix, Inc. with the following copyright
+notice:
+
+| Copyright 2016 Netflix, Inc.
+|
+| Licensed under the Apache License, Version 2.0 (the "License");
+| you may not use this file except in compliance with the License.
+| You may obtain a copy of the License at
+|
+| http://www.apache.org/licenses/LICENSE-2.0
+|
+| Unless required by applicable law or agreed to in writing, software
+| distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and
+| limitations under the License.
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/README.md
----------------------------------------------------------------------
diff --git a/parquet-cli/README.md b/parquet-cli/README.md
new file mode 100644
index 0000000..d17d719
--- /dev/null
+++ b/parquet-cli/README.md
@@ -0,0 +1,107 @@
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -->
+
+## Building
+
+You can build this project using maven:
+
+```
+mvn clean install -DskipTests
+```
+
+
+## Running
+
+The build produces a shaded Jar that can be run using the `hadoop` command:
+
+```
+hadoop jar parquet-cli-1.9.1-runtime.jar org.apache.parquet.cli.Main
+```
+
+For a shorter command-line invocation, add an alias to your shell like this:
+
+```
+alias parquet="hadoop jar /path/to/parquet-cli-1.9.1-runtime.jar org.apache.parquet.cli.Main --dollar-zero parquet"
+```
+
+### Running without Hadoop
+
+To run from the target directory instead of using the `hadoop` command, first copy the dependencies to a folder:
+
+```
+mvn dependency:copy-dependencies
+```
+
+Then, run the command-line and add `target/dependencies/*` to the classpath:
+
+```
+java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main
+```
+
+
+### Help
+
+The `parquet` tool includes help for the included commands:
+
+```
+parquet help
+```
+```
+Usage: parquet [options] [command] [command options]
+
+ Options:
+
+ -v, --verbose, --debug
+ Print extra debugging information
+
+ Commands:
+
+ help
+ Retrieves details on the functions of other commands
+ meta
+ Print a Parquet file's metadata
+ pages
+ Print page summaries for a Parquet file
+ dictionary
+ Print dictionaries for a Parquet column
+ check-stats
+ Check Parquet files for corrupt page and column stats (PARQUET-251)
+ schema
+ Print the Avro schema for a file
+ csv-schema
+ Build a schema from a CSV data sample
+ convert-csv
+ Create a file from CSV data
+ convert
+ Create a Parquet file from a data file
+ to-avro
+ Create an Avro file from a data file
+ cat
+ Print the first N records from a file
+ head
+ Print the first N records from a file
+
+ Examples:
+
+ # print information for create
+ parquet help create
+
+ See 'parquet help <command>' for more information on a specific command.
+```
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml
new file mode 100644
index 0000000..a9cd21b
--- /dev/null
+++ b/parquet-cli/pom.xml
@@ -0,0 +1,153 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet</artifactId>
+ <relativePath>../pom.xml</relativePath>
+ <version>1.9.1-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>parquet-cli</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Parquet Command-line</name>
+ <url>https://parquet.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>${opencsv.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ <version>${jcommander.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${commons-codec.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- This module disables semver checks because it is not a public API.
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ </plugin>
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>runtime</shadedClassifierName>
+ <minimizeJar>false</minimizeJar>
+ <filters>
+ <filter>
+ <artifact>org.xerial.snappy:*</artifact>
+ <excludes>
+ <exclude>**/LICENSE</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/LICENSE.txt</exclude>
+ <exclude>META-INF/NOTICE.txt</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <artifactSet>
+ <includes>
+ <include>*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <!-- relocate Avro in the runtime jar to avoid conflicts with
+ on-cluster Avro versions.
+ -->
+ <pattern>org.apache.avro</pattern>
+ <shadedPattern>${shade.prefix}.org.apache.avro</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
new file mode 100644
index 0000000..4b47164
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.CharStreams;
+import com.google.common.io.Resources;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.cli.json.AvroJsonReader;
+import org.apache.parquet.cli.util.Formats;
+import org.apache.parquet.cli.util.GetClassLoader;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.parquet.cli.util.SeekableFSDataInputStream;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.slf4j.Logger;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.security.AccessController;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public abstract class BaseCommand implements Command, Configurable {
+
+ @VisibleForTesting
+ static final Charset UTF8 = Charset.forName("utf8");
+
+ private static final String RESOURCE_URI_SCHEME = "resource";
+ private static final String STDIN_AS_SOURCE = "stdin";
+
+ protected final Logger console;
+
+ private Configuration conf = null;
+ private LocalFileSystem localFS = null;
+
+ public BaseCommand(Logger console) {
+ this.console = console;
+ }
+
+ /**
+ * @return FileSystem to use when no file system scheme is present in a path
+ * @throws IOException
+ */
+ public FileSystem defaultFS() throws IOException {
+ if (localFS == null) {
+ this.localFS = FileSystem.getLocal(getConf());
+ }
+ return localFS;
+ }
+
+ /**
+ * Output content to the console or a file.
+ *
+ * This will not produce checksum files.
+ *
+ * @param content String content to write
+ * @param console A {@link Logger} for writing to the console
+ * @param filename The destination {@link Path} as a String
+ * @throws IOException
+ */
+ public void output(String content, Logger console, String filename)
+ throws IOException {
+ if (filename == null || "-".equals(filename)) {
+ console.info(content);
+ } else {
+ FSDataOutputStream outgoing = create(filename);
+ try {
+ outgoing.write(content.getBytes(UTF8));
+ } finally {
+ outgoing.close();
+ }
+ }
+ }
+
+ /**
+ * Creates a file and returns an open {@link FSDataOutputStream}.
+ *
+ * If the file does not have a file system scheme, this uses the default FS.
+ *
+ * This will not produce checksum files and will overwrite a file that
+ * already exists.
+ *
+ * @param filename The filename to create
+ * @return An open FSDataOutputStream
+ * @throws IOException
+ */
+ public FSDataOutputStream create(String filename) throws IOException {
+ return create(filename, true);
+ }
+
+ /**
+ * Creates a file and returns an open {@link FSDataOutputStream}.
+ *
+ * If the file does not have a file system scheme, this uses the default FS.
+ *
+ * This will produce checksum files and will overwrite a file that already
+ * exists.
+ *
+ * @param filename The filename to create
+ * @return An open FSDataOutputStream
+ * @throws IOException
+ */
+ public FSDataOutputStream createWithChecksum(String filename)
+ throws IOException {
+ return create(filename, false);
+ }
+
+ private FSDataOutputStream create(String filename, boolean noChecksum)
+ throws IOException {
+ Path filePath = qualifiedPath(filename);
+ // even though it was qualified using the default FS, it may not be in it
+ FileSystem fs = filePath.getFileSystem(getConf());
+ if (noChecksum && fs instanceof ChecksumFileSystem) {
+ fs = ((ChecksumFileSystem) fs).getRawFileSystem();
+ }
+ return fs.create(filePath, true /* overwrite */);
+ }
+
+ /**
+ * Returns a qualified {@link Path} for the {@code filename}.
+ *
+ * If the file does not have a file system scheme, this uses the default FS.
+ *
+ * @param filename The filename to qualify
+ * @return A qualified Path for the filename
+ * @throws IOException
+ */
+ public Path qualifiedPath(String filename) throws IOException {
+ Path cwd = defaultFS().makeQualified(new Path("."));
+ return new Path(filename).makeQualified(defaultFS().getUri(), cwd);
+ }
+
+ /**
+ * Returns a {@link URI} for the {@code filename} that is a qualified Path or
+ * a resource URI.
+ *
+ * If the file does not have a file system scheme, this uses the default FS.
+ *
+ * @param filename The filename to qualify
+ * @return A qualified URI for the filename
+ * @throws IOException
+ */
+ public URI qualifiedURI(String filename) throws IOException {
+ URI fileURI = URI.create(filename);
+ if (RESOURCE_URI_SCHEME.equals(fileURI.getScheme())) {
+ return fileURI;
+ } else {
+ return qualifiedPath(filename).toUri();
+ }
+ }
+
+ /**
+ * Opens an existing file or resource.
+ *
+ * If the file does not have a file system scheme, this uses the default FS.
+ *
+ * @param filename The filename to open.
+ * @return An open InputStream with the file contents
+ * @throws IOException
+ * @throws IllegalArgumentException If the file does not exist
+ */
+ public InputStream open(String filename) throws IOException {
+ if (STDIN_AS_SOURCE.equals(filename)) {
+ return System.in;
+ }
+
+ URI uri = qualifiedURI(filename);
+ if (RESOURCE_URI_SCHEME.equals(uri.getScheme())) {
+ return Resources.getResource(uri.getRawSchemeSpecificPart()).openStream();
+ } else {
+ Path filePath = new Path(uri);
+ // even though it was qualified using the default FS, it may not be in it
+ FileSystem fs = filePath.getFileSystem(getConf());
+ return fs.open(filePath);
+ }
+ }
+
+ public SeekableInput openSeekable(String filename) throws IOException {
+ Path path = qualifiedPath(filename);
+ // even though it was qualified using the default FS, it may not be in it
+ FileSystem fs = path.getFileSystem(getConf());
+ return new SeekableFSDataInputStream(fs, path);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ HadoopFileSystemURLStreamHandler.setDefaultConf(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Returns a {@link ClassLoader} for a set of jars and directories.
+ *
+ * @param jars A list of jar paths
+ * @param paths A list of directories containing .class files
+ * @throws MalformedURLException
+ */
+ protected static ClassLoader loaderFor(List<String> jars, List<String> paths)
+ throws MalformedURLException {
+ return AccessController.doPrivileged(new GetClassLoader(urls(jars, paths)));
+ }
+
+ /**
+ * Returns a {@link ClassLoader} for a set of jars.
+ *
+ * @param jars A list of jar paths
+ * @throws MalformedURLException
+ */
+ protected static ClassLoader loaderForJars(List<String> jars)
+ throws MalformedURLException {
+ return AccessController.doPrivileged(new GetClassLoader(urls(jars, null)));
+ }
+
+ /**
+ * Returns a {@link ClassLoader} for a set of directories.
+ *
+ * @param paths A list of directories containing .class files
+ * @throws MalformedURLException
+ */
+ protected static ClassLoader loaderForPaths(List<String> paths)
+ throws MalformedURLException {
+ return AccessController.doPrivileged(new GetClassLoader(urls(null, paths)));
+ }
+
+ private static List<URL> urls(List<String> jars, List<String> dirs)
+ throws MalformedURLException {
+ // check the additional jars and lib directories in the local FS
+ final List<URL> urls = Lists.newArrayList();
+ if (dirs != null) {
+ for (String lib : dirs) {
+ // final URLs must end in '/' for URLClassLoader
+ File path = lib.endsWith("/") ? new File(lib) : new File(lib + "/");
+ Preconditions.checkArgument(path.exists(),
+ "Lib directory does not exist: " + lib);
+ Preconditions.checkArgument(path.isDirectory(),
+ "Not a directory: " + lib);
+ Preconditions.checkArgument(path.canRead() && path.canExecute(),
+ "Insufficient permissions to access lib directory: " + lib);
+ urls.add(path.toURI().toURL());
+ }
+ }
+ if (jars != null) {
+ for (String jar : jars) {
+ File path = new File(jar);
+ Preconditions.checkArgument(path.exists(),
+ "Jar files does not exist: " + jar);
+ Preconditions.checkArgument(path.isFile(),
+ "Not a file: " + jar);
+ Preconditions.checkArgument(path.canRead(),
+ "Cannot read jar file: " + jar);
+ urls.add(path.toURI().toURL());
+ }
+ }
+ return urls;
+ }
+
+ protected <D> Iterable<D> openDataFile(final String source, Schema projection)
+ throws IOException {
+ Formats.Format format = Formats.detectFormat(open(source));
+ switch (format) {
+ case PARQUET:
+ Configuration conf = new Configuration(getConf());
+ // TODO: add these to the reader builder
+ AvroReadSupport.setRequestedProjection(conf, projection);
+ AvroReadSupport.setAvroReadSchema(conf, projection);
+ final ParquetReader<D> parquet = AvroParquetReader.<D>builder(qualifiedPath(source))
+ .disableCompatibility()
+ .withDataModel(GenericData.get())
+ .withConf(conf)
+ .build();
+ return new Iterable<D>() {
+ @Override
+ public Iterator<D> iterator() {
+ return new Iterator<D>() {
+ private boolean hasNext = false;
+ private D next = advance();
+
+ @Override
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ @Override
+ public D next() {
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+ D toReturn = next;
+ this.next = advance();
+ return toReturn;
+ }
+
+ private D advance() {
+ try {
+ D next = parquet.read();
+ this.hasNext = (next != null);
+ return next;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed while reading Parquet file: " + source, e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not supported");
+ }
+ };
+ }
+ };
+
+ case AVRO:
+ Iterable<D> avroReader = (Iterable<D>) DataFileReader.openReader(
+ openSeekable(source), new GenericDatumReader<>(projection));
+ return avroReader;
+
+ default:
+ if (source.endsWith("json")) {
+ return new AvroJsonReader<>(open(source), projection);
+ } else {
+ Preconditions.checkArgument(projection == null,
+ "Cannot select columns from text files");
+ Iterable text = CharStreams.readLines(new InputStreamReader(open(source)));
+ return text;
+ }
+ }
+ }
+
+ protected Schema getAvroSchema(String source) throws IOException {
+ Formats.Format format;
+ try (SeekableInput in = openSeekable(source)) {
+ format = Formats.detectFormat((InputStream) in);
+ in.seek(0);
+
+ switch (format) {
+ case PARQUET:
+ return Schemas.fromParquet(getConf(), qualifiedURI(source));
+ case AVRO:
+ return Schemas.fromAvro(open(source));
+ case TEXT:
+ if (source.endsWith("avsc")) {
+ return Schemas.fromAvsc(open(source));
+ } else if (source.endsWith("json")) {
+ return Schemas.fromJSON("json", open(source));
+ }
+ default:
+ }
+
+ throw new IllegalArgumentException(String.format(
+ "Could not determine file format of %s.", source));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java
new file mode 100644
index 0000000..9c19143
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Command.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface Command {
+ /**
+ * Runs this {@code Command}.
+ *
+ * @return a return code for the process, 0 indicates success.
+ * @throws IOException
+ */
+ int run() throws IOException;
+
+ /**
+ * Returns a list of example uses. Lines starting with '#' will not have the
+ * executable name added when formatting.
+ *
+ * @return a list of String examples
+ */
+ List<String> getExamples();
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java b/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java
new file mode 100644
index 0000000..548544a
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/HadoopFileSystemURLStreamHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link URLStreamHandler} for handling Hadoop filesystem URLs,
+ * most commonly those with the <i>hdfs</i> scheme.
+ */
+public class HadoopFileSystemURLStreamHandler extends URLStreamHandler
+ implements Configurable {
+
+ private static Configuration defaultConf = new Configuration();
+
+ public static Configuration getDefaultConf() {
+ return defaultConf;
+ }
+
+ public static void setDefaultConf(Configuration defaultConf) {
+ HadoopFileSystemURLStreamHandler.defaultConf = defaultConf;
+ }
+
+ private Configuration conf = defaultConf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ protected URLConnection openConnection(URL url) throws IOException {
+ return new HadoopFileSystemURLConnection(url);
+ }
+
+ class HadoopFileSystemURLConnection extends URLConnection {
+ public HadoopFileSystemURLConnection(URL url) {
+ super(url);
+ }
+ @Override
+ public void connect() throws IOException {
+ }
+ @Override
+ public InputStream getInputStream() throws IOException {
+ Path path = new Path(url.toExternalForm());
+ FileSystem fileSystem = path.getFileSystem(conf);
+ return fileSystem.open(path);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java
new file mode 100644
index 0000000..791d169
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Help.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterDescription;
+import com.beust.jcommander.Parameters;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import java.util.List;
+
+@Parameters(commandDescription = "Retrieves details on the functions of other commands")
+public class Help implements Command {
+ @Parameter(description = "<commands>")
+ List<String> helpCommands = Lists.newArrayList();
+
+ private final JCommander jc;
+ private final Logger console;
+ private String programName;
+
+ public Help(JCommander jc, Logger console) {
+ this.jc = jc;
+ this.console = console;
+ }
+
+ public void setProgramName(String programName) {
+ this.programName = programName;
+ }
+
+ @Override
+ public int run() {
+ if (helpCommands.isEmpty()) {
+ printGenericHelp();
+
+ } else {
+ for (String cmd : helpCommands) {
+ JCommander commander = jc.getCommands().get(cmd);
+ if (commander == null) {
+ console.error("\nUnknown command: {}\n", cmd);
+ printGenericHelp();
+ return 1;
+ }
+
+ boolean hasRequired = false;
+ console.info("\nUsage: {} [general options] {} {} [command options]",
+ new Object[] {
+ programName, cmd,
+ commander.getMainParameterDescription()});
+ console.info("\n Description:");
+ console.info("\n {}", jc.getCommandDescription(cmd));
+ if (!commander.getParameters().isEmpty()) {
+ console.info("\n Command options:\n");
+ for (ParameterDescription param : commander.getParameters()) {
+ hasRequired = printOption(console, param) || hasRequired;
+ }
+ if (hasRequired) {
+ console.info("\n * = required");
+ }
+ }
+ List<String> examples = ((Command) commander.getObjects().get(0)).getExamples();
+ if (examples != null) {
+ console.info("\n Examples:");
+ for (String example : examples) {
+ if (example.startsWith("#")) {
+ // comment
+ console.info("\n {}", example);
+ } else {
+ console.info(" {} {} {}",
+ new Object[] {programName, cmd, example});
+ }
+ }
+ }
+ // add an extra newline in case there are more commands
+ console.info("");
+ }
+ }
+ return 0;
+ }
+
+ public void printGenericHelp() {
+ boolean hasRequired = false;
+ console.info(
+ "\nUsage: {} [options] [command] [command options]",
+ programName);
+ console.info("\n Options:\n");
+ for (ParameterDescription param : jc.getParameters()) {
+ hasRequired = printOption(console, param) || hasRequired;
+ }
+ if (hasRequired) {
+ console.info("\n * = required");
+ }
+ console.info("\n Commands:\n");
+ for (String command : jc.getCommands().keySet()) {
+ console.info(" {}\n\t{}",
+ command, jc.getCommandDescription(command));
+ }
+ console.info("\n Examples:");
+ console.info("\n # print information for create\n {} help create",
+ programName);
+ console.info("\n See '{} help <command>' for more information on a " +
+ "specific command.", programName);
+ }
+
+ private boolean printOption(Logger console, ParameterDescription param) {
+ boolean required = param.getParameter().required();
+ if (!param.getParameter().hidden()) {
+ console.info(" {} {}\n\t{}{}", new Object[]{
+ required ? "*" : " ",
+ param.getNames().trim(),
+ param.getDescription(),
+ formatDefault(param)});
+ }
+ return required;
+ }
+
+ private String formatDefault(ParameterDescription param) {
+ Object defaultValue = param.getDefault();
+ if (defaultValue == null || param.getParameter().arity() < 1) {
+ return "";
+ }
+ return " (default: " + ((defaultValue instanceof String) ?
+ "\"" + defaultValue + "\"" :
+ defaultValue.toString()) + ")";
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
new file mode 100644
index 0000000..990193c
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Main.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.MissingCommandException;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.parquet.cli.commands.CSVSchemaCommand;
+import org.apache.parquet.cli.commands.CatCommand;
+import org.apache.parquet.cli.commands.CheckParquet251Command;
+import org.apache.parquet.cli.commands.ConvertCSVCommand;
+import org.apache.parquet.cli.commands.ConvertCommand;
+import org.apache.parquet.cli.commands.ParquetMetadataCommand;
+import org.apache.parquet.cli.commands.SchemaCommand;
+import org.apache.parquet.cli.commands.ShowDictionaryCommand;
+import org.apache.parquet.cli.commands.ShowPagesCommand;
+import org.apache.parquet.cli.commands.ToAvroCommand;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.apache.log4j.PropertyConfigurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Set;
+
+@Parameters(commandDescription = "Parquet file utils")
+public class Main extends Configured implements Tool {
+
+ @Parameter(names = {"-v", "--verbose", "--debug"},
+ description = "Print extra debugging information")
+ private boolean debug = false;
+
+ @VisibleForTesting
+ @Parameter(names="--dollar-zero",
+ description="A way for the runtime path to be passed in", hidden=true)
+ String programName = DEFAULT_PROGRAM_NAME;
+
+ @VisibleForTesting
+ static final String DEFAULT_PROGRAM_NAME = "parquet";
+
+ private static Set<String> HELP_ARGS = ImmutableSet.of("-h", "-help", "--help", "help");
+
+ private final Logger console;
+ private final Help help;
+
+ @VisibleForTesting
+ final JCommander jc;
+
+ Main(Logger console) {
+ this.console = console;
+ this.jc = new JCommander(this);
+ this.help = new Help(jc, console);
+ jc.setProgramName(DEFAULT_PROGRAM_NAME);
+ jc.addCommand("help", help, "-h", "-help", "--help");
+ jc.addCommand("meta", new ParquetMetadataCommand(console));
+ jc.addCommand("pages", new ShowPagesCommand(console));
+ jc.addCommand("dictionary", new ShowDictionaryCommand(console));
+ jc.addCommand("check-stats", new CheckParquet251Command(console));
+ jc.addCommand("schema", new SchemaCommand(console));
+ jc.addCommand("csv-schema", new CSVSchemaCommand(console));
+ jc.addCommand("convert-csv", new ConvertCSVCommand(console));
+ jc.addCommand("convert", new ConvertCommand(console));
+ jc.addCommand("to-avro", new ToAvroCommand(console));
+ jc.addCommand("cat", new CatCommand(console, 0));
+ jc.addCommand("head", new CatCommand(console, 10));
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ try {
+ jc.parse(args);
+ } catch (MissingCommandException e) {
+ console.error(e.getMessage());
+ return 1;
+ } catch (ParameterException e) {
+ help.setProgramName(programName);
+ String cmd = jc.getParsedCommand();
+ if (args.length == 1) { // i.e., just the command (missing required arguments)
+ help.helpCommands.add(cmd);
+ help.run();
+ return 1;
+ } else { // check for variants like 'cmd --help' etc.
+ for (String arg : args) {
+ if (HELP_ARGS.contains(arg)) {
+ help.helpCommands.add(cmd);
+ help.run();
+ return 0;
+ }
+ }
+ }
+ console.error(e.getMessage());
+ return 1;
+ }
+
+ help.setProgramName(programName);
+
+ // configure log4j
+ if (debug) {
+ org.apache.log4j.Logger console = org.apache.log4j.Logger.getLogger(Main.class);
+ console.setLevel(Level.DEBUG);
+ }
+
+ String parsed = jc.getParsedCommand();
+ if (parsed == null) {
+ help.run();
+ return 1;
+ } else if ("help".equals(parsed)) {
+ return help.run();
+ }
+
+ Command command = (Command) jc.getCommands().get(parsed).getObjects().get(0);
+ if (command == null) {
+ help.run();
+ return 1;
+ }
+
+ try {
+ if (command instanceof Configurable) {
+ ((Configurable) command).setConf(getConf());
+ }
+ return command.run();
+ } catch (IllegalArgumentException e) {
+ if (debug) {
+ console.error("Argument error", e);
+ } else {
+ console.error("Argument error: {}", e.getMessage());
+ }
+ return 1;
+ } catch (IllegalStateException e) {
+ if (debug) {
+ console.error("State error", e);
+ } else {
+ console.error("State error: {}", e.getMessage());
+ }
+ return 1;
+ } catch (Exception e) {
+ console.error("Unknown error", e);
+ return 1;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ // reconfigure logging with the kite CLI configuration
+ PropertyConfigurator.configure(
+ Main.class.getResource("/cli-logging.properties"));
+ Logger console = LoggerFactory.getLogger(Main.class);
+ // use Log4j for any libraries using commons-logging
+ LogFactory.getFactory().setAttribute(
+ "org.apache.commons.logging.Log",
+ "org.apache.commons.logging.impl.Log4JLogger");
+ int rc = ToolRunner.run(new Configuration(), new Main(console), args);
+ System.exit(rc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
new file mode 100644
index 0000000..860a218
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.column.statistics.BooleanStatistics;
+import org.apache.parquet.column.statistics.DoubleStatistics;
+import org.apache.parquet.column.statistics.FloatStatistics;
+import org.apache.parquet.column.statistics.IntStatistics;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.Locale;
+import java.util.Set;
+
+import static org.apache.parquet.column.Encoding.BIT_PACKED;
+import static org.apache.parquet.column.Encoding.DELTA_BINARY_PACKED;
+import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+import static org.apache.parquet.format.Encoding.DELTA_LENGTH_BYTE_ARRAY;
+
+
+public class Util {
+
+ private static final long KB = 1024;
+ private static final long MB = 1024 * KB;
+ private static final long GB = 1024 * MB;
+ private static final long TB = 1024 * GB;
+
+ public static String humanReadable(float bytes) {
+ if (bytes > TB) {
+ return String.format("%.03f TB", bytes / TB);
+ } else if (bytes > GB) {
+ return String.format("%.03f GB", bytes / GB);
+ } else if (bytes > MB) {
+ return String.format("%.03f MB", bytes / MB);
+ } else if (bytes > KB) {
+ return String.format("%.03f kB", bytes / KB);
+ } else {
+ return String.format("%.02f B", bytes);
+ }
+ }
+
+ public static String humanReadable(long bytes) {
+ if (bytes > TB) {
+ return String.format("%.03f TB", ((float) bytes) / TB);
+ } else if (bytes > GB) {
+ return String.format("%.03f GB", ((float) bytes) / GB);
+ } else if (bytes > MB) {
+ return String.format("%.03f MB", ((float) bytes) / MB);
+ } else if (bytes > KB) {
+ return String.format("%.03f kB", ((float) bytes) / KB);
+ } else {
+ return String.format("%d B", bytes);
+ }
+ }
+
+ public static String minMaxAsString(Statistics stats, OriginalType annotation) {
+ if (stats == null) {
+ return "no stats";
+ }
+ if (!stats.hasNonNullValue()) {
+ return "";
+ }
+ // TODO: use original types when showing decimal, timestamp, etc.
+ if (stats instanceof BooleanStatistics) {
+ return String.format("%s / %s",
+ ((BooleanStatistics) stats).getMin(),
+ ((BooleanStatistics) stats).getMax());
+ } else if (stats instanceof IntStatistics) {
+ return String.format("%d / %d",
+ ((IntStatistics) stats).getMin(),
+ ((IntStatistics) stats).getMax());
+ } else if (stats instanceof LongStatistics) {
+ return String.format("%d / %d",
+ ((LongStatistics) stats).getMin(),
+ ((LongStatistics) stats).getMax());
+ } else if (stats instanceof FloatStatistics) {
+ return String.format("%f / %f",
+ ((FloatStatistics) stats).getMin(),
+ ((FloatStatistics) stats).getMax());
+ } else if (stats instanceof DoubleStatistics) {
+ return String.format("%f / %f",
+ ((DoubleStatistics) stats).getMin(),
+ ((DoubleStatistics) stats).getMax());
+ } else if (stats instanceof BinaryStatistics) {
+ byte[] minBytes = stats.getMinBytes();
+ byte[] maxBytes = stats.getMaxBytes();
+ return String.format("%s / %s",
+ printable(minBytes, annotation == OriginalType.UTF8, 30),
+ printable(maxBytes, annotation == OriginalType.UTF8, 30));
+ } else {
+ throw new RuntimeException("Unknown stats type: " + stats);
+ }
+ }
+
+ public static String toString(Statistics stats, long count, OriginalType annotation) {
+ if (stats == null) {
+ return "no stats";
+ }
+ // TODO: use original types when showing decimal, timestamp, etc.
+ if (stats instanceof BooleanStatistics) {
+ return String.format("nulls: %d/%d", stats.getNumNulls(), count);
+ } else if (stats instanceof IntStatistics) {
+ return String.format("min: %d max: %d nulls: %d/%d",
+ ((IntStatistics) stats).getMin(), ((IntStatistics) stats).getMax(),
+ stats.getNumNulls(), count);
+ } else if (stats instanceof LongStatistics) {
+ return String.format("min: %d max: %d nulls: %d/%d",
+ ((LongStatistics) stats).getMin(), ((LongStatistics) stats).getMax(),
+ stats.getNumNulls(), count);
+ } else if (stats instanceof FloatStatistics) {
+ return String.format("min: %f max: %f nulls: %d/%d",
+ ((FloatStatistics) stats).getMin(),
+ ((FloatStatistics) stats).getMax(),
+ stats.getNumNulls(), count);
+ } else if (stats instanceof DoubleStatistics) {
+ return String.format("min: %f max: %f nulls: %d/%d",
+ ((DoubleStatistics) stats).getMin(),
+ ((DoubleStatistics) stats).getMax(),
+ stats.getNumNulls(), count);
+ } else if (stats instanceof BinaryStatistics) {
+ byte[] minBytes = stats.getMinBytes();
+ byte[] maxBytes = stats.getMaxBytes();
+ return String.format("min: %s max: %s nulls: %d/%d",
+ printable(minBytes, annotation == OriginalType.UTF8, 30),
+ printable(maxBytes, annotation == OriginalType.UTF8, 30),
+ stats.getNumNulls(), count);
+ } else {
+ throw new RuntimeException("Unknown stats type: " + stats);
+ }
+ }
+
+ private static String printable(byte[] bytes, boolean isUtf8, int len) {
+ if (bytes == null) {
+ return "null";
+ } else if (isUtf8) {
+ return humanReadable(new String(bytes, StandardCharsets.UTF_8), len);
+ } else {
+ return humanReadable(bytes, len);
+ }
+ }
+
+ public static String humanReadable(String str, int len) {
+ if (str == null) {
+ return "null";
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("\"");
+ if (str.length() > len - 2) {
+ sb.append(str.substring(0, len - 5)).append("...");
+ } else {
+ sb.append(str);
+ }
+ sb.append("\"");
+
+ return sb.toString();
+ }
+
+ public static String humanReadable(byte[] bytes, int len) {
+ if (bytes == null || bytes.length == 0) {
+ return "null";
+ }
+
+ StringBuilder sb = new StringBuilder();
+ String asString = Hex.encodeHexString(bytes);
+ sb.append("0x");
+ if (asString.length() > len - 2) {
+ sb.append(asString.substring(0, (len - 5) / 2)).append("...");
+ } else {
+ sb.append(asString);
+ }
+
+ return sb.toString();
+ }
+
+ public static String shortCodec(CompressionCodecName codec) {
+ switch (codec) {
+ case UNCOMPRESSED:
+ return "_";
+ case SNAPPY:
+ return "S";
+ case GZIP:
+ return "G";
+ case LZO:
+ return "L";
+ default:
+ return "?";
+ }
+ }
+
+ public static String encodingAsString(Encoding encoding, boolean isDict) {
+ switch (encoding) {
+ case PLAIN:
+ return "_";
+ case PLAIN_DICTIONARY:
+ // data pages use RLE, dictionary pages use plain
+ return isDict ? "_" : "R";
+ case RLE_DICTIONARY:
+ return "R";
+ case DELTA_BINARY_PACKED:
+ case DELTA_LENGTH_BYTE_ARRAY:
+ case DELTA_BYTE_ARRAY:
+ return "D";
+ default:
+ return "?";
+ }
+ }
+
+ public static String encodingStatsAsString(EncodingStats encodingStats) {
+ StringBuilder sb = new StringBuilder();
+ if (encodingStats.hasDictionaryPages()) {
+ for (Encoding encoding: encodingStats.getDictionaryEncodings()) {
+ sb.append(encodingAsString(encoding, true));
+ }
+ sb.append(" ");
+ } else {
+ sb.append(" ");
+ }
+
+ Set<Encoding> encodings = encodingStats.getDataEncodings();
+ if (encodings.contains(RLE_DICTIONARY) || encodings.contains(PLAIN_DICTIONARY)) {
+ sb.append("R");
+ }
+ if (encodings.contains(PLAIN)) {
+ sb.append("_");
+ }
+ if (encodings.contains(DELTA_BYTE_ARRAY) ||
+ encodings.contains(DELTA_BINARY_PACKED) ||
+ encodings.contains(DELTA_LENGTH_BYTE_ARRAY)) {
+ sb.append("D");
+ }
+
+ // Check for fallback and add a flag
+ if (encodingStats.hasDictionaryEncodedPages() && encodingStats.hasNonDictionaryEncodedPages()) {
+ sb.append(" F");
+ }
+
+ return sb.toString();
+ }
+
+ public static String encodingsAsString(Set<Encoding> encodings, ColumnDescriptor desc) {
+ StringBuilder sb = new StringBuilder();
+ if (encodings.contains(RLE) || encodings.contains(BIT_PACKED)) {
+ sb.append(desc.getMaxDefinitionLevel() == 0 ? "B" : "R");
+ sb.append(desc.getMaxRepetitionLevel() == 0 ? "B" : "R");
+ if (encodings.contains(PLAIN_DICTIONARY)) {
+ sb.append("R");
+ }
+ if (encodings.contains(PLAIN)) {
+ sb.append("_");
+ }
+ } else {
+ sb.append("RR");
+ if (encodings.contains(RLE_DICTIONARY)) {
+ sb.append("R");
+ }
+ if (encodings.contains(PLAIN)) {
+ sb.append("_");
+ }
+ if (encodings.contains(DELTA_BYTE_ARRAY) ||
+ encodings.contains(DELTA_BINARY_PACKED) ||
+ encodings.contains(DELTA_LENGTH_BYTE_ARRAY)) {
+ sb.append("D");
+ }
+ }
+ return sb.toString();
+ }
+
+ private static final Splitter DOT = Splitter.on('.');
+
+ public static ColumnDescriptor descriptor(String column, MessageType schema) {
+ String[] path = Iterables.toArray(DOT.split(column), String.class);
+ Preconditions.checkArgument(schema.containsPath(path),
+ "Schema doesn't have column: " + column);
+ return schema.getColumnDescription(path);
+ }
+
+ public static String columnName(ColumnDescriptor desc) {
+ return Joiner.on('.').join(desc.getPath());
+ }
+
+ public static PrimitiveType primitive(MessageType schema, String[] path) {
+ Type current = schema;
+ for (String part : path) {
+ current = current.asGroupType().getType(part);
+ if (current.isPrimitive()) {
+ return current.asPrimitiveType();
+ }
+ }
+ return null;
+ }
+
+ public static PrimitiveType primitive(String column, MessageType schema) {
+ String[] path = Iterables.toArray(DOT.split(column), String.class);
+ Preconditions.checkArgument(schema.containsPath(path),
+ "Schema doesn't have column: " + column);
+ return primitive(schema, path);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java
new file mode 100644
index 0000000..4fbfb9b
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CSVSchemaCommand.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.csv.CSVProperties;
+import org.apache.parquet.cli.csv.AvroCSV;
+import org.slf4j.Logger;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Set;
+
+@Parameters(commandDescription="Build a schema from a CSV data sample")
+public class CSVSchemaCommand extends BaseCommand {
+
+ public CSVSchemaCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description="<sample csv path>")
+ List<String> samplePaths;
+
+ @Parameter(names={"-o", "--output"}, description="Save schema avsc to path")
+ String outputPath = null;
+
+ @Parameter(names={"--class", "--record-name"}, required = true,
+ description="A name or class for the result schema")
+ String recordName = null;
+
+ @Parameter(names="--minimize",
+ description="Minimize schema file size by eliminating white space")
+ boolean minimize=false;
+
+ @Parameter(names="--delimiter", description="Delimiter character")
+ String delimiter = ",";
+
+ @Parameter(names="--escape", description="Escape character")
+ String escape = "\\";
+
+ @Parameter(names="--quote", description="Quote character")
+ String quote = "\"";
+
+ @Parameter(names="--no-header", description="Don't use first line as CSV header")
+ boolean noHeader = false;
+
+ @Parameter(names="--skip-lines", description="Lines to skip before CSV start")
+ int linesToSkip = 0;
+
+ @Parameter(names="--charset", description="Character set name", hidden = true)
+ String charsetName = Charset.defaultCharset().displayName();
+
+ @Parameter(names="--header",
+ description="Line to use as a header. Must match the CSV settings.")
+ String header;
+
+ @Parameter(names="--require",
+ description="Do not allow null values for the given field")
+ List<String> requiredFields;
+
+ @Override
+ public int run() throws IOException {
+ Preconditions.checkArgument(samplePaths != null && !samplePaths.isEmpty(),
+ "Sample CSV path is required");
+ Preconditions.checkArgument(samplePaths.size() == 1,
+ "Only one CSV sample can be given");
+
+ if (header != null) {
+ // if a header is given on the command line, do assume one is in the file
+ noHeader = true;
+ }
+
+ CSVProperties props = new CSVProperties.Builder()
+ .delimiter(delimiter)
+ .escape(escape)
+ .quote(quote)
+ .header(header)
+ .hasHeader(!noHeader)
+ .linesToSkip(linesToSkip)
+ .charset(charsetName)
+ .build();
+
+ Set<String> required = ImmutableSet.of();
+ if (requiredFields != null) {
+ required = ImmutableSet.copyOf(requiredFields);
+ }
+
+ // assume fields are nullable by default, users can easily change this
+ String sampleSchema = AvroCSV
+ .inferNullableSchema(
+ recordName, open(samplePaths.get(0)), props, required)
+ .toString(!minimize);
+
+ output(sampleSchema, console, outputPath);
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Print the schema for samples.csv to standard out:",
+ "samples.csv --record-name Sample",
+ "# Write schema to sample.avsc:",
+ "samples.csv -o sample.avsc --record-name Sample"
+ );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
new file mode 100644
index 0000000..7703e88
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.avro.Schema;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Expressions;
+import org.slf4j.Logger;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.parquet.cli.util.Expressions.select;
+
+@Parameters(commandDescription = "Print the first N records from a file")
+public class CatCommand extends BaseCommand {
+
+ @Parameter(description = "<file>")
+ List<String> sourceFiles;
+
+ @Parameter(names={"-n", "--num-records"},
+ description="The number of records to print")
+ long numRecords;
+
+ @Parameter(
+ names = {"-c", "--column", "--columns"},
+ description = "List of columns")
+ List<String> columns;
+
+ public CatCommand(Logger console, long defaultNumRecords) {
+ super(console);
+ this.numRecords = defaultNumRecords;
+ }
+
+ @Override
+ public int run() throws IOException {
+ Preconditions.checkArgument(
+ sourceFiles != null && !sourceFiles.isEmpty(),
+ "Missing file name");
+ Preconditions.checkArgument(sourceFiles.size() == 1,
+ "Only one file can be given");
+
+ final String source = sourceFiles.get(0);
+
+ Schema schema = getAvroSchema(source);
+ Schema projection = Expressions.filterSchema(schema, columns);
+
+ Iterable<Object> reader = openDataFile(source, projection);
+ boolean threw = true;
+ long count = 0;
+ try {
+ for (Object record : reader) {
+ if (numRecords > 0 && count >= numRecords) {
+ break;
+ }
+ if (columns == null || columns.size() != 1) {
+ console.info(String.valueOf(record));
+ } else {
+ console.info(String.valueOf(select(projection, record, columns.get(0))));
+ }
+ count += 1;
+ }
+ threw = false;
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count, e);
+ } finally {
+ if (reader instanceof Closeable) {
+ Closeables.close((Closeable) reader, threw);
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Show the first 10 records in file \"data.avro\":",
+ "data.avro",
+ "# Show the first 50 records in file \"data.parquet\":",
+ "data.parquet -n 50"
+ );
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
new file mode 100644
index 0000000..8f60821
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.CorruptStatistics;
+import org.apache.parquet.Version;
+import org.apache.parquet.VersionParser;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.util.DynConstructors;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
+import org.slf4j.Logger;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
+@Parameters(commandDescription = "Check Parquet files for corrupt page and column stats (PARQUET-251)")
+public class CheckParquet251Command extends BaseCommand {
+
+ public CheckParquet251Command(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description = "<files>", required = true)
+ List<String> files;
+
+ @Override
+ public int run() throws IOException {
+ boolean badFiles = false;
+ for (String file : files) {
+ String problem = check(file);
+ if (problem != null) {
+ badFiles = true;
+ console.info("{} has corrupt stats: {}", file, problem);
+ } else {
+ console.info("{} has no corrupt stats", file);
+ }
+ }
+
+ return badFiles ? 1 : 0;
+ }
+
+ private String check(String file) throws IOException {
+ Path path = qualifiedPath(file);
+ ParquetMetadata footer = ParquetFileReader.readFooter(
+ getConf(), path, ParquetMetadataConverter.NO_FILTER);
+
+ FileMetaData meta = footer.getFileMetaData();
+ String createdBy = meta.getCreatedBy();
+ if (CorruptStatistics.shouldIgnoreStatistics(createdBy, BINARY)) {
+ // create fake metadata that will read corrupt stats and return them
+ FileMetaData fakeMeta = new FileMetaData(
+ meta.getSchema(), meta.getKeyValueMetaData(), Version.FULL_VERSION);
+
+ // get just the binary columns
+ List<ColumnDescriptor> columns = Lists.newArrayList();
+ Iterables.addAll(columns, Iterables.filter(
+ meta.getSchema().getColumns(),
+ new Predicate<ColumnDescriptor>() {
+ @Override
+ public boolean apply(@Nullable ColumnDescriptor input) {
+ return input != null && input.getType() == BINARY;
+ }
+ }));
+
+ // now check to see if the data is actually corrupt
+ ParquetFileReader reader = new ParquetFileReader(getConf(),
+ fakeMeta, path, footer.getBlocks(), columns);
+
+ try {
+ PageStatsValidator validator = new PageStatsValidator();
+ for (PageReadStore pages = reader.readNextRowGroup(); pages != null;
+ pages = reader.readNextRowGroup()) {
+ validator.validate(columns, pages);
+ }
+ } catch (BadStatsException e) {
+ return e.getMessage();
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Arrays.asList(
+ "# Check file1.parquet for corrupt page and column stats",
+ "file1.parquet");
+ }
+
+
+ public static class BadStatsException extends RuntimeException {
+ public BadStatsException(String message) {
+ super(message);
+ }
+ }
+
+ public class SingletonPageReader implements PageReader {
+ private final DictionaryPage dict;
+ private final DataPage data;
+
+ public SingletonPageReader(DictionaryPage dict, DataPage data) {
+ this.dict = dict;
+ this.data = data;
+ }
+
+ @Override
+ public DictionaryPage readDictionaryPage() {
+ return dict;
+ }
+
+ @Override
+ public long getTotalValueCount() {
+ return data.getValueCount();
+ }
+
+ @Override
+ public DataPage readPage() {
+ return data;
+ }
+ }
+
+ private static <T extends Comparable<T>>
+ Statistics<T> getStatisticsFromPageHeader(DataPage page) {
+ return page.accept(new DataPage.Visitor<Statistics<T>>() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public Statistics<T> visit(DataPageV1 dataPageV1) {
+ return (Statistics<T>) dataPageV1.getStatistics();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Statistics<T> visit(DataPageV2 dataPageV2) {
+ return (Statistics<T>) dataPageV2.getStatistics();
+ }
+ });
+ }
+
+ private class StatsValidator<T extends Comparable<T>> {
+ private final boolean hasNonNull;
+ private final T min;
+ private final T max;
+
+ public StatsValidator(DataPage page) {
+ Statistics<T> stats = getStatisticsFromPageHeader(page);
+ this.hasNonNull = stats.hasNonNullValue();
+ if (hasNonNull) {
+ this.min = stats.genericGetMin();
+ this.max = stats.genericGetMax();
+ } else {
+ this.min = null;
+ this.max = null;
+ }
+ }
+
+ public void validate(T value) {
+ if (hasNonNull) {
+ if (min.compareTo(value) > 0) {
+ throw new BadStatsException("Min should be <= all values.");
+ }
+ if (max.compareTo(value) < 0) {
+ throw new BadStatsException("Max should be >= all values.");
+ }
+ }
+ }
+ }
+
+ private PrimitiveConverter getValidatingConverter(
+ final DataPage page, PrimitiveTypeName type) {
+ return type.convert(new PrimitiveTypeNameConverter<PrimitiveConverter, RuntimeException>() {
+ @Override
+ public PrimitiveConverter convertFLOAT(PrimitiveTypeName primitiveTypeName) {
+ final StatsValidator<Float> validator = new StatsValidator<Float>(page);
+ return new PrimitiveConverter() {
+ @Override
+ public void addFloat(float value) {
+ validator.validate(value);
+ }
+ };
+ }
+
+ @Override
+ public PrimitiveConverter convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
+ final StatsValidator<Double> validator = new StatsValidator<Double>(page);
+ return new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ validator.validate(value);
+ }
+ };
+ }
+
+ @Override
+ public PrimitiveConverter convertINT32(PrimitiveTypeName primitiveTypeName) {
+ final StatsValidator<Integer> validator = new StatsValidator<Integer>(page);
+ return new PrimitiveConverter() {
+ @Override
+ public void addInt(int value) {
+ validator.validate(value);
+ }
+ };
+ }
+
+ @Override
+ public PrimitiveConverter convertINT64(PrimitiveTypeName primitiveTypeName) {
+ final StatsValidator<Long> validator = new StatsValidator<Long>(page);
+ return new PrimitiveConverter() {
+ @Override
+ public void addLong(long value) {
+ validator.validate(value);
+ }
+ };
+ }
+
+ @Override
+ public PrimitiveConverter convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
+ final StatsValidator<Boolean> validator = new StatsValidator<Boolean>(page);
+ return new PrimitiveConverter() {
+ @Override
+ public void addBoolean(boolean value) {
+ validator.validate(value);
+ }
+ };
+ }
+
+ @Override
+ public PrimitiveConverter convertINT96(PrimitiveTypeName primitiveTypeName) {
+ return convertBINARY(primitiveTypeName);
+ }
+
+ @Override
+ public PrimitiveConverter convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
+ return convertBINARY(primitiveTypeName);
+ }
+
+ @Override
+ public PrimitiveConverter convertBINARY(PrimitiveTypeName primitiveTypeName) {
+ final StatsValidator<Binary> validator = new StatsValidator<Binary>(page);
+ return new PrimitiveConverter() {
+ @Override
+ public void addBinary(Binary value) {
+ validator.validate(value);
+ }
+ };
+ }
+ });
+ }
+
+ private static final DynConstructors.Ctor<ColumnReader> COL_READER_CTOR =
+ new DynConstructors.Builder(ColumnReader.class)
+ .hiddenImpl("org.apache.parquet.column.impl.ColumnReaderImpl",
+ ColumnDescriptor.class, PageReader.class,
+ PrimitiveConverter.class, VersionParser.ParsedVersion.class)
+ .build();
+
+ public class PageStatsValidator {
+ public void validate(List<ColumnDescriptor> columns, PageReadStore store) {
+ for (ColumnDescriptor desc : columns) {
+ PageReader reader = store.getPageReader(desc);
+ DictionaryPage dict = reader.readDictionaryPage();
+ DictionaryPage reusableDict = null;
+ if (dict != null) {
+ try {
+ reusableDict = new DictionaryPage(
+ BytesInput.from(dict.getBytes().toByteArray()),
+ dict.getDictionarySize(), dict.getEncoding());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Cannot read dictionary", e);
+ }
+ }
+ DataPage page;
+ while ((page = reader.readPage()) != null) {
+ validateStatsForPage(page, reusableDict, desc);
+ }
+ }
+ }
+
+ private void validateStatsForPage(DataPage page, DictionaryPage dict,
+ ColumnDescriptor desc) {
+ SingletonPageReader reader = new SingletonPageReader(dict, page);
+ PrimitiveConverter converter = getValidatingConverter(page, desc.getType());
+ Statistics stats = getStatisticsFromPageHeader(page);
+
+ long numNulls = 0;
+
+ ColumnReader column = COL_READER_CTOR.newInstance(desc, reader, converter, null);
+ for (int i = 0; i < reader.getTotalValueCount(); i += 1) {
+ if (column.getCurrentDefinitionLevel() >= desc.getMaxDefinitionLevel()) {
+ column.writeCurrentValueToConverter();
+ } else {
+ numNulls += 1;
+ }
+ column.consume();
+ }
+
+ if (numNulls != stats.getNumNulls()) {
+ throw new BadStatsException("Number of nulls doesn't match.");
+ }
+
+ console.debug(String.format(
+ "Validated stats min=%s max=%s nulls=%d for page=%s col=%s",
+ String.valueOf(stats.genericGetMin()),
+ String.valueOf(stats.genericGetMax()), stats.getNumNulls(), page,
+ Arrays.toString(desc.getPath())));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
new file mode 100644
index 0000000..624ba91
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCSVCommand.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.csv.AvroCSVReader;
+import org.apache.parquet.cli.csv.CSVProperties;
+import org.apache.parquet.cli.csv.AvroCSV;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.cli.util.Codecs;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.avro.generic.GenericData.Record;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+
+@Parameters(commandDescription="Create a file from CSV data")
+public class ConvertCSVCommand extends BaseCommand {
+
+ public ConvertCSVCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description="<csv path>")
+ List<String> targets;
+
+ @Parameter(
+ names={"-o", "--output"},
+ description="Output file path",
+ required=true)
+ String outputPath = null;
+
+ @Parameter(
+ names={"-2", "--format-version-2", "--writer-version-2"},
+ description="Use Parquet format version 2",
+ hidden = true)
+ boolean v2 = false;
+
+ @Parameter(names="--delimiter", description="Delimiter character")
+ String delimiter = ",";
+
+ @Parameter(names="--escape", description="Escape character")
+ String escape = "\\";
+
+ @Parameter(names="--quote", description="Quote character")
+ String quote = "\"";
+
+ @Parameter(names="--no-header", description="Don't use first line as CSV header")
+ boolean noHeader = false;
+
+ @Parameter(names="--skip-lines", description="Lines to skip before CSV start")
+ int linesToSkip = 0;
+
+ @Parameter(names="--charset", description="Character set name", hidden = true)
+ String charsetName = Charset.defaultCharset().displayName();
+
+ @Parameter(names="--header",
+ description="Line to use as a header. Must match the CSV settings.")
+ String header;
+
+ @Parameter(names="--require",
+ description="Do not allow null values for the given field")
+ List<String> requiredFields;
+
+ @Parameter(names = {"-s", "--schema"},
+ description = "The file containing the Avro schema.")
+ String avroSchemaFile;
+
+ @Parameter(names = {"--compression-codec"},
+ description = "A compression codec name.")
+ String compressionCodecName = "GZIP";
+
+ @Parameter(names="--row-group-size", description="Target row group size")
+ int rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
+
+ @Parameter(names="--page-size", description="Target page size")
+ int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
+
+ @Parameter(names="--dictionary-size", description="Max dictionary page size")
+ int dictionaryPageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
+
+ @Parameter(
+ names={"--overwrite"},
+ description="Remove any data already in the target view or dataset")
+ boolean overwrite = false;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(targets != null && targets.size() == 1,
+ "CSV path is required.");
+
+ if (header != null) {
+ // if a header is given on the command line, don't assume one is in the file
+ noHeader = true;
+ }
+
+ CSVProperties props = new CSVProperties.Builder()
+ .delimiter(delimiter)
+ .escape(escape)
+ .quote(quote)
+ .header(header)
+ .hasHeader(!noHeader)
+ .linesToSkip(linesToSkip)
+ .charset(charsetName)
+ .build();
+
+ String source = targets.get(0);
+
+ Schema csvSchema;
+ if (avroSchemaFile != null) {
+ csvSchema = Schemas.fromAvsc(open(avroSchemaFile));
+ } else {
+ Set<String> required = ImmutableSet.of();
+ if (requiredFields != null) {
+ required = ImmutableSet.copyOf(requiredFields);
+ }
+
+ String filename = new File(source).getName();
+ String recordName;
+ if (filename.contains(".")) {
+ recordName = filename.substring(0, filename.indexOf("."));
+ } else {
+ recordName = filename;
+ }
+
+ csvSchema = AvroCSV.inferNullableSchema(
+ recordName, open(source), props, required);
+ }
+
+ long count = 0;
+ try (AvroCSVReader<Record> reader = new AvroCSVReader<>(
+ open(source), props, csvSchema, Record.class, true)) {
+ CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName);
+ try (ParquetWriter<Record> writer = AvroParquetWriter
+ .<Record>builder(qualifiedPath(outputPath))
+ .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
+ .withWriteMode(overwrite ?
+ ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
+ .withCompressionCodec(codec)
+ .withDictionaryEncoding(true)
+ .withDictionaryPageSize(dictionaryPageSize)
+ .withPageSize(pageSize)
+ .withRowGroupSize(rowGroupSize)
+ .withDataModel(GenericData.get())
+ .withConf(getConf())
+ .withSchema(csvSchema)
+ .build()) {
+ for (Record record : reader) {
+ writer.write(record);
+ }
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count, e);
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Create a Parquet file from a CSV file",
+ "sample.csv sample.parquet --schema schema.avsc",
+ "# Create a Parquet file in HDFS from local CSV",
+ "path/to/sample.csv hdfs:/user/me/sample.parquet --schema schema.avsc",
+ "# Create an Avro file from CSV data in S3",
+ "s3:/data/path/sample.csv sample.avro --format avro --schema s3:/schemas/schema.avsc"
+ );
+ }
+}