You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2017/07/28 23:25:28 UTC
[3/4] parquet-mr git commit: PARQUET-777: Add Parquet CLI.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java
new file mode 100644
index 0000000..7f82874
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Codecs;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.avro.generic.GenericData.Record;
+import static org.apache.parquet.cli.util.Expressions.filterSchema;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+
+@Parameters(commandDescription="Create a Parquet file from a data file")
+public class ConvertCommand extends BaseCommand {
+
+ public ConvertCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description = "<file>")
+ List<String> targets;
+
+ @Parameter(
+ names={"-o", "--output"},
+ description="Output file path",
+ required=true)
+ String outputPath = null;
+
+ @Parameter(names = {"-s", "--schema"},
+ description = "The file containing the Avro schema.")
+ String avroSchemaFile;
+
+ @Parameter(
+ names = {"-c", "--column", "--columns"},
+ description = "List of columns")
+ List<String> columns;
+
+ @Parameter(names = {"--compression-codec"},
+ description = "A compression codec name.")
+ String compressionCodecName = "GZIP";
+
+ @Parameter(
+ names={"--overwrite"},
+ description="Overwrite the output file if it exists")
+ boolean overwrite = false;
+
+ @Parameter(
+ names={"-2", "--format-version-2", "--writer-version-2"},
+ description="Use Parquet format version 2",
+ hidden = true)
+ boolean v2 = false;
+
+ @Parameter(names="--row-group-size", description="Target row group size")
+ int rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
+
+ @Parameter(names="--page-size", description="Target page size")
+ int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
+
+ @Parameter(names="--dictionary-size", description="Max dictionary page size")
+ int dictionaryPageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(targets != null && targets.size() == 1,
+ "A data file is required.");
+
+ String source = targets.get(0);
+
+ CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName);
+
+ Schema schema;
+ if (avroSchemaFile != null) {
+ schema = Schemas.fromAvsc(open(avroSchemaFile));
+ } else {
+ schema = getAvroSchema(source);
+ }
+ Schema projection = filterSchema(schema, columns);
+
+ Path outPath = qualifiedPath(outputPath);
+ FileSystem outFS = outPath.getFileSystem(getConf());
+ if (overwrite && outFS.exists(outPath)) {
+ console.debug("Deleting output file {} (already exists)", outPath);
+ outFS.delete(outPath);
+ }
+
+ Iterable<Record> reader = openDataFile(source, projection);
+ boolean threw = true;
+ long count = 0;
+ try {
+ try (ParquetWriter<Record> writer = AvroParquetWriter
+ .<Record>builder(qualifiedPath(outputPath))
+ .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
+ .withConf(getConf())
+ .withCompressionCodec(codec)
+ .withRowGroupSize(rowGroupSize)
+ .withDictionaryPageSize(dictionaryPageSize < 64 ? 64 : dictionaryPageSize)
+ .withDictionaryEncoding(dictionaryPageSize != 0)
+ .withPageSize(pageSize)
+ .withDataModel(GenericData.get())
+ .withSchema(projection)
+ .build()) {
+ for (Record record : reader) {
+ writer.write(record);
+ count += 1;
+ }
+ }
+ threw = false;
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count, e);
+ } finally {
+ if (reader instanceof Closeable) {
+ Closeables.close((Closeable) reader, threw);
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Create a Parquet file from an Avro file",
+ "sample.avro -o sample.parquet",
+ "# Create a Parquet file in S3 from a local Avro file",
+ "path/to/sample.avro -o s3:/user/me/sample.parquet",
+ "# Create a Parquet file from Avro data in S3",
+ "s3:/data/path/sample.avro -o sample.parquet"
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
new file mode 100644
index 0000000..0bd77a3
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.commons.lang.StringUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.cli.Util.encodingStatsAsString;
+import static org.apache.parquet.cli.Util.encodingsAsString;
+import static org.apache.parquet.cli.Util.humanReadable;
+import static org.apache.parquet.cli.Util.minMaxAsString;
+import static org.apache.parquet.cli.Util.primitive;
+import static org.apache.parquet.cli.Util.shortCodec;
+
+@Parameters(commandDescription="Print a Parquet file's metadata")
+public class ParquetMetadataCommand extends BaseCommand {
+
+ public ParquetMetadataCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description = "<parquet path>")
+ List<String> targets;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(targets != null && targets.size() >= 1,
+ "A Parquet file is required.");
+ Preconditions.checkArgument(targets.size() == 1,
+ "Cannot process multiple Parquet files.");
+
+ String source = targets.get(0);
+ ParquetMetadata footer = ParquetFileReader.readFooter(
+ getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER);
+
+ console.info("\nFile path: {}", source);
+ console.info("Created by: {}", footer.getFileMetaData().getCreatedBy());
+
+ Map<String, String> kv = footer.getFileMetaData().getKeyValueMetaData();
+ if (kv != null && !kv.isEmpty()) {
+ console.info("Properties:");
+ String format = " %" + maxSize(kv.keySet()) + "s: %s";
+ for (Map.Entry<String, String> entry : kv.entrySet()) {
+ console.info(String.format(format, entry.getKey(), entry.getValue()));
+ }
+ } else {
+ console.info("Properties: (none)");
+ }
+
+ MessageType schema = footer.getFileMetaData().getSchema();
+ console.info("Schema:\n{}", schema);
+
+ List<BlockMetaData> rowGroups = footer.getBlocks();
+ for (int index = 0, n = rowGroups.size(); index < n; index += 1) {
+ printRowGroup(console, index, rowGroups.get(index), schema);
+ }
+
+ console.info("");
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ );
+ }
+
+ private int maxSize(Iterable<String> strings) {
+ int size = 0;
+ for (String s : strings) {
+ size = Math.max(size, s.length());
+ }
+ return size;
+ }
+
+ private void printRowGroup(Logger console, int index, BlockMetaData rowGroup, MessageType schema) {
+ long start = rowGroup.getStartingPos();
+ long rowCount = rowGroup.getRowCount();
+ long compressedSize = rowGroup.getCompressedSize();
+ long uncompressedSize = rowGroup.getTotalByteSize();
+ String filePath = rowGroup.getPath();
+
+ console.info(String.format("\nRow group %d: count: %d %s records start: %d total: %s%s\n%s",
+ index, rowCount,
+ humanReadable(((float) compressedSize) / rowCount),
+ start, humanReadable(compressedSize),
+ filePath != null ? " path: " + filePath : "",
+ StringUtils.leftPad("", 80, '-')));
+
+ int size = maxSize(Iterables.transform(rowGroup.getColumns(),
+ new Function<ColumnChunkMetaData, String>() {
+ @Override
+ public String apply(@Nullable ColumnChunkMetaData input) {
+ return input == null ? "" : input.getPath().toDotString();
+ }
+ }));
+
+ console.info(String.format("%-" + size + "s %-9s %-9s %-9s %-10s %-7s %s",
+ "", "type", "encodings", "count", "avg size", "nulls", "min / max"));
+ for (ColumnChunkMetaData column : rowGroup.getColumns()) {
+ printColumnChunk(console, size, column, schema);
+ }
+ }
+
+ private void printColumnChunk(Logger console, int width, ColumnChunkMetaData column, MessageType schema) {
+ String[] path = column.getPath().toArray();
+ PrimitiveType type = primitive(schema, path);
+ Preconditions.checkNotNull(type);
+
+ ColumnDescriptor desc = schema.getColumnDescription(path);
+ long size = column.getTotalSize();
+ long count = column.getValueCount();
+ float perValue = ((float) size) / count;
+ CompressionCodecName codec = column.getCodec();
+ Set<Encoding> encodings = column.getEncodings();
+ EncodingStats encodingStats = column.getEncodingStats();
+ String encodingSummary = encodingStats == null ?
+ encodingsAsString(encodings, desc) :
+ encodingStatsAsString(encodingStats);
+ Statistics stats = column.getStatistics();
+
+ String name = column.getPath().toDotString();
+
+ PrimitiveType.PrimitiveTypeName typeName = type.getPrimitiveTypeName();
+ if (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+ console.info(String.format("%-" + width + "s FIXED[%d] %s %-7s %-9d %-8s %-7s %s",
+ name, type.getTypeLength(), shortCodec(codec), encodingSummary, count,
+ humanReadable(perValue), stats == null ? "" : String.valueOf(stats.getNumNulls()),
+ minMaxAsString(stats, type.getOriginalType())));
+ } else {
+ console.info(String.format("%-" + width + "s %-9s %s %-7s %-9d %-10s %-7s %s",
+ name, typeName, shortCodec(codec), encodingSummary, count, humanReadable(perValue),
+ stats == null ? "" : String.valueOf(stats.getNumNulls()),
+ minMaxAsString(stats, type.getOriginalType())));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
new file mode 100644
index 0000000..ea2306f
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Formats;
+import org.apache.avro.file.SeekableInput;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.slf4j.Logger;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+@Parameters(commandDescription="Print the Avro schema for a file")
+public class SchemaCommand extends BaseCommand {
+
+ public SchemaCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description = "<parquet path>")
+ List<String> targets;
+
+ @Parameter(
+ names={"-o", "--output"},
+ description="Output file path")
+ String outputPath = null;
+
+ @Parameter(
+ names={"--overwrite"},
+ description="Overwrite the output file if it exists")
+ boolean overwrite = false;
+
+ @Parameter(
+ names={"--parquet"},
+ description="Print a Parquet schema, without converting to Avro",
+ hidden=true)
+ boolean parquetSchema = false;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(targets != null && targets.size() == 1,
+ "Parquet file is required.");
+
+ if (targets.size() > 1) {
+ Preconditions.checkArgument(outputPath == null,
+ "Cannot output multiple schemas to file " + outputPath);
+ for (String source : targets) {
+ console.info("{}: {}", source, getSchema(source));
+ }
+
+ } else {
+ String source = targets.get(0);
+
+ if (outputPath != null) {
+ Path outPath = qualifiedPath(outputPath);
+ FileSystem outFS = outPath.getFileSystem(getConf());
+ if (overwrite && outFS.exists(outPath)) {
+ console.debug("Deleting output file {} (already exists)", outPath);
+ outFS.delete(outPath);
+ }
+
+ try (OutputStream out = create(outputPath)) {
+ out.write(getSchema(source).getBytes(StandardCharsets.UTF_8));
+ }
+ } else {
+ console.info(getSchema(source));
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Print the Avro schema for a Parquet file",
+ "sample.parquet",
+ "# Print the Avro schema for an Avro file",
+ "sample.avro",
+ "# Print the Avro schema for a JSON file",
+ "sample.json"
+ );
+ }
+
+ private String getSchema(String source) throws IOException {
+ if (parquetSchema) {
+ return getParquetSchema(source);
+ } else {
+ return getAvroSchema(source).toString(true);
+ }
+ }
+
+ private String getParquetSchema(String source) throws IOException {
+ Formats.Format format;
+ try (SeekableInput in = openSeekable(source)) {
+ format = Formats.detectFormat((InputStream) in);
+ in.seek(0);
+
+ switch (format) {
+ case PARQUET:
+ return new ParquetFileReader(
+ getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)
+ .getFileMetaData().getSchema().toString();
+ default:
+ throw new IllegalArgumentException(String.format(
+ "Could not get a Parquet schema for format %s: %s", format, source));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
new file mode 100644
index 0000000..db427c9
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.Util;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import java.io.IOException;
+import java.util.List;
+
+// TODO: show dictionary size in values and in bytes
+@Parameters(commandDescription="Print dictionaries for a Parquet column")
+public class ShowDictionaryCommand extends BaseCommand {
+
+ public ShowDictionaryCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description = "<parquet path>")
+ List<String> targets;
+
+ @Parameter(
+ names = {"-c", "--column"},
+ description = "Column path",
+ required = true)
+ String column;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(targets != null && targets.size() >= 1,
+ "A Parquet file is required.");
+ Preconditions.checkArgument(targets.size() == 1,
+ "Cannot process multiple Parquet files.");
+
+ String source = targets.get(0);
+
+ ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
+ MessageType schema = reader.getFileMetaData().getSchema();
+ ColumnDescriptor descriptor = Util.descriptor(column, schema);
+ PrimitiveType type = Util.primitive(column, schema);
+ Preconditions.checkNotNull(type);
+
+ DictionaryPageReadStore dictionaryReader;
+ int rowGroup = 0;
+ while ((dictionaryReader = reader.getNextDictionaryReader()) != null) {
+ DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor);
+
+ Dictionary dict = page.getEncoding().initDictionary(descriptor, page);
+
+ console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize());
+ for (int i = 0; i <= dict.getMaxId(); i += 1) {
+ switch(type.getPrimitiveTypeName()) {
+ case BINARY:
+ if (type.getOriginalType() == OriginalType.UTF8) {
+ console.info("{}: {}", String.format("%6d", i),
+ Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
+ } else {
+ console.info("{}: {}", String.format("%6d", i),
+ Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70));
+ }
+ break;
+ case INT32:
+ console.info("{}: {}", String.format("%6d", i),
+ dict.decodeToInt(i));
+ break;
+ case INT64:
+ console.info("{}: {}", String.format("%6d", i),
+ dict.decodeToLong(i));
+ break;
+ case FLOAT:
+ console.info("{}: {}", String.format("%6d", i),
+ dict.decodeToFloat(i));
+ break;
+ case DOUBLE:
+ console.info("{}: {}", String.format("%6d", i),
+ dict.decodeToDouble(i));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown dictionary type: " + type.getPrimitiveTypeName());
+ }
+ }
+
+ reader.skipNextRowGroup();
+
+ rowGroup += 1;
+ }
+
+ console.info("");
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Show the dictionary for column 'col' from a Parquet file",
+ "-c col sample.parquet"
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
new file mode 100644
index 0000000..beda452
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.commons.lang.StringUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.Page;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.parquet.cli.Util.columnName;
+import static org.apache.parquet.cli.Util.descriptor;
+import static org.apache.parquet.cli.Util.encodingAsString;
+import static org.apache.parquet.cli.Util.humanReadable;
+import static org.apache.parquet.cli.Util.minMaxAsString;
+import static org.apache.parquet.cli.Util.primitive;
+import static org.apache.parquet.cli.Util.shortCodec;
+
+@Parameters(commandDescription="Print page summaries for a Parquet file")
+public class ShowPagesCommand extends BaseCommand {
+
+ public ShowPagesCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description = "<parquet path>")
+ List<String> targets;
+
+ @Parameter(
+ names = {"-c", "--column", "--columns"},
+ description = "List of columns")
+ List<String> columns;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(targets != null && targets.size() >= 1,
+ "A Parquet file is required.");
+ Preconditions.checkArgument(targets.size() == 1,
+ "Cannot process multiple Parquet files.");
+
+ String source = targets.get(0);
+ ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
+
+ MessageType schema = reader.getFileMetaData().getSchema();
+ Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
+ if (this.columns == null || this.columns.isEmpty()) {
+ for (ColumnDescriptor descriptor : schema.getColumns()) {
+ columns.put(descriptor, primitive(schema, descriptor.getPath()));
+ }
+ } else {
+ for (String column : this.columns) {
+ columns.put(descriptor(column, schema), primitive(column, schema));
+ }
+ }
+
+ CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
+ // accumulate formatted lines to print by column
+ Map<String, List<String>> formatted = Maps.newLinkedHashMap();
+ PageFormatter formatter = new PageFormatter();
+ PageReadStore pageStore;
+ int rowGroupNum = 0;
+ while ((pageStore = reader.readNextRowGroup()) != null) {
+ for (ColumnDescriptor descriptor : columns.keySet()) {
+ List<String> lines = formatted.get(columnName(descriptor));
+ if (lines == null) {
+ lines = Lists.newArrayList();
+ formatted.put(columnName(descriptor), lines);
+ }
+
+ formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
+ PageReader pages = pageStore.getPageReader(descriptor);
+
+ DictionaryPage dict = pages.readDictionaryPage();
+ if (dict != null) {
+ lines.add(formatter.format(dict));
+ }
+ DataPage page;
+ while ((page = pages.readPage()) != null) {
+ lines.add(formatter.format(page));
+ }
+ }
+ rowGroupNum += 1;
+ }
+
+ // TODO: Show total column size and overall size per value in the column summary line
+ for (String columnName : formatted.keySet()) {
+ console.info(String.format("\nColumn: %s\n%s", columnName, StringUtils.leftPad("", 80, '-')));
+ console.info(formatter.getHeader());
+ for (String line : formatted.get(columnName)) {
+ console.info(line);
+ }
+ console.info("");
+ }
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Show pages for column 'col' from a Parquet file",
+ "-c col sample.parquet"
+ );
+ }
+
+ private class PageFormatter implements DataPage.Visitor<String> {
+ private int rowGroupNum;
+ private int pageNum;
+ private PrimitiveType type;
+ private String shortCodec;
+
+ String getHeader() {
+ return String.format(" %-6s %-5s %-4s %-7s %-10s %-10s %-8s %-7s %s",
+ "page", "type", "enc", "count", "avg size", "size", "rows", "nulls", "min / max");
+ }
+
+ void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec) {
+ this.rowGroupNum = rowGroupNum;
+ this.pageNum = 0;
+ this.type = type;
+ this.shortCodec = shortCodec(codec);
+ }
+
+ String format(Page page) {
+ String formatted = "";
+ if (page instanceof DictionaryPage) {
+ formatted = printDictionaryPage((DictionaryPage) page);
+ } else if (page instanceof DataPage) {
+ formatted = ((DataPage) page).accept(this);
+ }
+ pageNum += 1;
+ return formatted;
+ }
+
+ private String printDictionaryPage(DictionaryPage dict) {
+ // TODO: the compressed size of a dictionary page is lost in Parquet
+ dict.getUncompressedSize();
+ long totalSize = dict.getCompressedSize();
+ int count = dict.getDictionarySize();
+ float perValue = ((float) totalSize) / count;
+ String enc = encodingAsString(dict.getEncoding(), true);
+ if (pageNum == 0) {
+ return String.format("%3d-D %-5s %s %-2s %-7d %-10s %-10s",
+ rowGroupNum, "dict", shortCodec, enc, count, humanReadable(perValue),
+ humanReadable(totalSize));
+ } else {
+ return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s",
+ rowGroupNum, pageNum, "dict", shortCodec, enc, count, humanReadable(perValue),
+ humanReadable(totalSize));
+ }
+ }
+
+ @Override
+ public String visit(DataPageV1 page) {
+ String enc = encodingAsString(page.getValueEncoding(), false);
+ long totalSize = page.getCompressedSize();
+ int count = page.getValueCount();
+ long numNulls = page.getStatistics().getNumNulls();
+ float perValue = ((float) totalSize) / count;
+ String minMax = minMaxAsString(page.getStatistics(), type.getOriginalType());
+ return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s %-8s %-7s %s",
+ rowGroupNum, pageNum, "data", shortCodec, enc, count, humanReadable(perValue),
+ humanReadable(totalSize), "", numNulls, minMax);
+ }
+
+ @Override
+ public String visit(DataPageV2 page) {
+ String enc = encodingAsString(page.getDataEncoding(), false);
+ long totalSize = page.getCompressedSize();
+ int count = page.getValueCount();
+ int numRows = page.getRowCount();
+ int numNulls = page.getNullCount();
+ float perValue = ((float) totalSize) / count;
+ String minMax = minMaxAsString(page.getStatistics(), type.getOriginalType());
+ String compression = (page.isCompressed() ? shortCodec : "_");
+ return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s %-8d %-7s %s",
+ rowGroupNum, pageNum, "data", compression, enc, count, humanReadable(perValue),
+ humanReadable(totalSize), numRows, numNulls, minMax);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
new file mode 100644
index 0000000..ceb11cf
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Codecs;
+import org.apache.parquet.cli.util.Schemas;
+import org.slf4j.Logger;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.avro.generic.GenericData.Record;
+import static org.apache.parquet.cli.util.Expressions.filterSchema;
+
+@Parameters(commandDescription="Create an Avro file from a data file")
+public class ToAvroCommand extends BaseCommand {
+
+ public ToAvroCommand(Logger console) {
+ super(console);
+ }
+
+ @Parameter(description = "<file>")
+ List<String> targets;
+
+ @Parameter(
+ names={"-o", "--output"},
+ description="Output file path",
+ required=true)
+ String outputPath = null;
+
+ @Parameter(names = {"-s", "--schema"},
+ description = "The file containing an Avro schema for the output file")
+ String avroSchemaFile;
+
+ @Parameter(
+ names = {"-c", "--column", "--columns"},
+ description = "List of columns")
+ List<String> columns;
+
+ @Parameter(names = {"--compression-codec"},
+ description = "A compression codec name.")
+ String compressionCodecName = "GZIP";
+
+ @Parameter(
+ names={"--overwrite"},
+ description="Overwrite the output file if it exists")
+ boolean overwrite = false;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public int run() throws IOException {
+ Preconditions.checkArgument(targets != null && targets.size() == 1,
+ "A data file is required.");
+
+ String source = targets.get(0);
+
+ CodecFactory codecFactory = Codecs.avroCodec(compressionCodecName);
+
+ Schema schema;
+ if (avroSchemaFile != null) {
+ schema = Schemas.fromAvsc(open(avroSchemaFile));
+ } else {
+ schema = getAvroSchema(source);
+ }
+ Schema projection = filterSchema(schema, columns);
+
+ Path outPath = qualifiedPath(outputPath);
+ FileSystem outFS = outPath.getFileSystem(getConf());
+ if (overwrite && outFS.exists(outPath)) {
+ console.debug("Deleting output file {} (already exists)", outPath);
+ outFS.delete(outPath);
+ }
+
+ Iterable<Record> reader = openDataFile(source, projection);
+ boolean threw = true;
+ long count = 0;
+ try {
+ DatumWriter<Record> datumWriter = new GenericDatumWriter<>(schema);
+ DataFileWriter<Record> w = new DataFileWriter<>(datumWriter);
+ w.setCodec(codecFactory);
+
+ try (DataFileWriter<Record> writer = w.create(projection, create(outputPath))) {
+ for (Record record : reader) {
+ writer.append(record);
+ count += 1;
+ }
+ }
+ threw = false;
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Failed on record " + count, e);
+ } finally {
+ if (reader instanceof Closeable) {
+ Closeables.close((Closeable) reader, threw);
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public List<String> getExamples() {
+ return Lists.newArrayList(
+ "# Create an Avro file from a Parquet file",
+ "sample.parquet sample.avro",
+ "# Create an Avro file in HDFS from a local JSON file",
+ "path/to/sample.json hdfs:/user/me/sample.parquet",
+ "# Create an Avro file from data in S3",
+ "s3:/data/path/sample.parquet sample.avro"
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSV.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSV.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSV.java
new file mode 100644
index 0000000..47cd665
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSV.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.csv;
+
+import au.com.bytecode.opencsv.CSVParser;
+import au.com.bytecode.opencsv.CSVReader;
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static java.lang.Math.min;
+
+public class AvroCSV {
+
+ private static final Pattern LONG = Pattern.compile("\\d+");
+ private static final Pattern DOUBLE = Pattern.compile("\\d*\\.\\d*[dD]?");
+ private static final Pattern FLOAT = Pattern.compile("\\d*\\.\\d*[fF]?");
+ private static final int DEFAULT_INFER_LINES = 25;
+ private static final Set<String> NO_REQUIRED_FIELDS = ImmutableSet.of();
+ //As per the Avro specs mentioned here -http://avro.apache.org/docs/1.7.5/spec.html
+ // It should start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]
+ private static final Pattern AVRO_COMPATIBLE = Pattern.
+ compile("^[A-Za-z_][A-Za-z\\d_]*$");
+
+ static CSVReader newReader(InputStream incoming, CSVProperties props) {
+ return new CSVReader(
+ new InputStreamReader(incoming, Charset.forName(props.charset)),
+ props.delimiter.charAt(0), props.quote.charAt(0),
+ props.escape.charAt(0), props.linesToSkip,
+ false /* strict quotes off: don't ignore unquoted strings */,
+ true /* ignore leading white-space */ );
+ }
+
+ static CSVParser newParser(CSVProperties props) {
+ return new CSVParser(
+ props.delimiter.charAt(0), props.quote.charAt(0),
+ props.escape.charAt(0),
+ false /* strict quotes off: don't ignore unquoted strings */,
+ true /* ignore leading white-space */ );
+ }
+
+ public static Schema inferNullableSchema(String name, InputStream incoming,
+ CSVProperties props)
+ throws IOException {
+ return inferSchemaInternal(name, incoming, props, NO_REQUIRED_FIELDS, true);
+ }
+
+ public static Schema inferNullableSchema(String name, InputStream incoming,
+ CSVProperties props,
+ Set<String> requiredFields)
+ throws IOException {
+ return inferSchemaInternal(name, incoming, props, requiredFields, true);
+ }
+
+ public static Schema inferSchema(String name, InputStream incoming,
+ CSVProperties props)
+ throws IOException {
+ return inferSchemaInternal(name, incoming, props, NO_REQUIRED_FIELDS, false);
+ }
+
+ public static Schema inferSchema(String name, InputStream incoming,
+ CSVProperties props,
+ Set<String> requiredFields)
+ throws IOException {
+ return inferSchemaInternal(name, incoming, props, requiredFields, false);
+ }
+
+ private static Schema inferSchemaInternal(String name, InputStream incoming,
+ CSVProperties props,
+ Set<String> requiredFields,
+ boolean makeNullable)
+ throws IOException {
+ CSVReader reader = newReader(incoming, props);
+
+ String[] header;
+ String[] line;
+ if (props.useHeader) {
+ // read the header and then the first line
+ header = reader.readNext();
+ line = reader.readNext();
+ Preconditions.checkNotNull(line, "No content to infer schema");
+
+ } else if (props.header != null) {
+ header = newParser(props).parseLine(props.header);
+ line = reader.readNext();
+ Preconditions.checkNotNull(line, "No content to infer schema");
+
+ } else {
+ // use the first line to create a header
+ line = reader.readNext();
+ Preconditions.checkNotNull(line, "No content to infer schema");
+ header = new String[line.length];
+ for (int i = 0; i < line.length; i += 1) {
+ header[i] = "field_" + String.valueOf(i);
+ }
+ }
+
+ Schema.Type[] types = new Schema.Type[header.length];
+ String[] values = new String[header.length];
+ boolean[] nullable = new boolean[header.length];
+ boolean[] empty = new boolean[header.length];
+
+ for (int processed = 0; processed < DEFAULT_INFER_LINES; processed += 1) {
+ if (line == null) {
+ break;
+ }
+
+ for (int i = 0; i < header.length; i += 1) {
+ if (i < line.length) {
+ if (types[i] == null) {
+ types[i] = inferFieldType(line[i]);
+ if (types[i] != null) {
+ // keep track of the value used
+ values[i] = line[i];
+ }
+ }
+
+ if (line[i] == null) {
+ nullable[i] = true;
+ } else if (line[i].isEmpty()) {
+ empty[i] = true;
+ }
+ } else {
+ // no value results in null
+ nullable[i] = true;
+ }
+ }
+
+ line = reader.readNext();
+ }
+
+ SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder.record(name).fields();
+
+ // types may be missing, but fieldSchema will return a nullable string
+ for (int i = 0; i < header.length; i += 1) {
+ if (header[i] == null) {
+ throw new RuntimeException("Bad header for field " + i + ": null");
+ }
+
+ String fieldName = header[i].trim();
+
+ if (fieldName.isEmpty()) {
+ throw new RuntimeException(
+ "Bad header for field " + i + ": \"" + fieldName + "\"");
+ } else if(!isAvroCompatibleName(fieldName)) {
+ throw new RuntimeException(
+ "Bad header for field, should start with a character " +
+ "or _ and can contain only alphanumerics and _ " +
+ i + ": \"" + fieldName + "\"");
+ }
+
+ // the empty string is not considered null for string fields
+ boolean foundNull = (nullable[i] ||
+ (empty[i] && types[i] != Schema.Type.STRING));
+
+ if (requiredFields.contains(fieldName)) {
+ if (foundNull) {
+ throw new RuntimeException("Found null value for required field: " +
+ fieldName + " (" + types[i] + ")");
+ }
+ fieldAssembler = fieldAssembler.name(fieldName)
+ .doc("Type inferred from '" + sample(values[i]) + "'")
+ .type(schema(types[i], false)).noDefault();
+ } else {
+ SchemaBuilder.GenericDefault<Schema> defaultBuilder = fieldAssembler.name(fieldName)
+ .doc("Type inferred from '" + sample(values[i]) + "'")
+ .type(schema(types[i], makeNullable || foundNull));
+ if (makeNullable || foundNull) {
+ fieldAssembler = defaultBuilder.withDefault(null);
+ } else {
+ fieldAssembler = defaultBuilder.noDefault();
+ }
+ }
+ }
+ return fieldAssembler.endRecord();
+ }
+
+ private static final CharMatcher NON_PRINTABLE = CharMatcher
+ .inRange('\u0020', '\u007e').negate();
+
+ private static String sample(String value) {
+ if (value != null) {
+ return NON_PRINTABLE.replaceFrom(
+ value.subSequence(0, min(50, value.length())), '.');
+ } else {
+ return "null";
+ }
+ }
+
+ /**
+ * Create a {@link Schema} for the given type. If the type is null,
+ * the schema will be a nullable String. If isNullable is true, the returned
+ * schema will be nullable.
+ *
+ * @param type a {@link Schema.Type} compatible with {@code Schema.create}
+ * @param makeNullable If {@code true}, the return type will be nullable
+ * @return a {@code Schema} for the given {@code Schema.Type}
+ * @see Schema#create(org.apache.avro.Schema.Type)
+ */
+ private static Schema schema(Schema.Type type, boolean makeNullable) {
+ Schema schema = Schema.create(type == null ? Schema.Type.STRING : type);
+ if (makeNullable || type == null) {
+ schema = Schema.createUnion(Lists.newArrayList(
+ Schema.create(Schema.Type.NULL), schema));
+ }
+ return schema;
+ }
+
+ private static Schema.Type inferFieldType(String example) {
+ if (example == null || example.isEmpty()) {
+ return null; // not enough information
+ } else if (LONG.matcher(example).matches()) {
+ return Schema.Type.LONG;
+ } else if (DOUBLE.matcher(example).matches()) {
+ return Schema.Type.DOUBLE;
+ } else if (FLOAT.matcher(example).matches()) {
+ return Schema.Type.FLOAT;
+ }
+ return Schema.Type.STRING;
+ }
+
+ /**
+ * Returns true if the name does not contain characters that are known to be
+ * incompatible with the specs defined in Avro schema.
+ *
+ * @param name a String field name to check
+ * @return will return true if the name is Avro compatible ,false if not
+ */
+ private static boolean isAvroCompatibleName(String name) {
+ return AVRO_COMPATIBLE.matcher(name).matches();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSVReader.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSVReader.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSVReader.java
new file mode 100644
index 0000000..8d5e835
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSVReader.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.csv;
+
+import au.com.bytecode.opencsv.CSVReader;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.util.RuntimeIOException;
+import org.apache.avro.Schema;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class AvroCSVReader<E> implements Iterator<E>, Iterable<E>, Closeable {
+
+ private final boolean reuseRecords;
+ private final CSVReader reader;
+ private final RecordBuilder<E> builder;
+ private boolean hasNext = false;
+ private String[] next = null;
+ private E record = null;
+
+ public AvroCSVReader(InputStream stream, CSVProperties props,
+ Schema schema, Class<E> type, boolean reuseRecords) {
+ this.reader = AvroCSV.newReader(stream, props);
+ this.reuseRecords = reuseRecords;
+
+ Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
+ "Schemas for CSV files must be records of primitive types");
+
+ List<String> header = null;
+ if (props.useHeader) {
+ this.hasNext = advance();
+ header = Lists.newArrayList(next);
+ } else if (props.header != null) {
+ try {
+ header = Lists.newArrayList(
+ AvroCSV.newParser(props).parseLine(props.header));
+ } catch (IOException e) {
+ throw new RuntimeIOException(
+ "Failed to parse header from properties: " + props.header, e);
+ }
+ }
+
+ this.builder = new RecordBuilder<>(schema, type, header);
+
+ // initialize by reading the first record
+ this.hasNext = advance();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ @Override
+ public E next() {
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+
+ try {
+ if (reuseRecords) {
+ this.record = builder.makeRecord(next, record);
+ return record;
+ } else {
+ return builder.makeRecord(next, null);
+ }
+ } finally {
+ this.hasNext = advance();
+ }
+ }
+
+ private boolean advance() {
+ try {
+ next = reader.readNext();
+ } catch (IOException ex) {
+ throw new RuntimeIOException("Could not read record", ex);
+ }
+ return (next != null);
+ }
+
+ @Override
+ public void close() {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new RuntimeIOException("Cannot close reader", e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not implemented.");
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/csv/CSVProperties.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/CSVProperties.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/CSVProperties.java
new file mode 100644
index 0000000..bd4ba06
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/CSVProperties.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.csv;
+
+import javax.annotation.concurrent.Immutable;
+import org.apache.commons.lang.StringEscapeUtils;
+
+@Immutable
+public class CSVProperties {
+
+ public static final String DEFAULT_CHARSET = "utf8";
+ public static final String DEFAULT_DELIMITER = ",";
+ public static final String DEFAULT_QUOTE = "\"";
+ public static final String DEFAULT_ESCAPE = "\\";
+ public static final String DEFAULT_HAS_HEADER = "false";
+ public static final int DEFAULT_LINES_TO_SKIP = 0;
+
+ // configuration
+ public final String charset;
+ public final String delimiter;
+ public final String quote;
+ public final String escape;
+ public final String header;
+ public final boolean useHeader;
+ public final int linesToSkip;
+
+ private CSVProperties(String charset, String delimiter, String quote,
+ String escape, String header, boolean useHeader,
+ int linesToSkip) {
+ this.charset = charset;
+ this.delimiter = delimiter;
+ this.quote = quote;
+ this.escape = escape;
+ this.header = header;
+ this.useHeader = useHeader;
+ this.linesToSkip = linesToSkip;
+ }
+
+ public static class Builder {
+ private String charset = DEFAULT_CHARSET;
+ private String delimiter = DEFAULT_DELIMITER;
+ private String quote = DEFAULT_QUOTE;
+ private String escape = DEFAULT_ESCAPE;
+ private boolean useHeader = Boolean.valueOf(DEFAULT_HAS_HEADER);
+ private int linesToSkip = DEFAULT_LINES_TO_SKIP;
+ private String header = null;
+
+ public Builder charset(String charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public Builder delimiter(String delimiter) {
+ this.delimiter = StringEscapeUtils.unescapeJava(delimiter);
+ return this;
+ }
+
+ public Builder quote(String quote) {
+ this.quote = StringEscapeUtils.unescapeJava(quote);
+ return this;
+ }
+
+ public Builder escape(String escape) {
+ this.escape = StringEscapeUtils.unescapeJava(escape);
+ return this;
+ }
+
+ public Builder header(String header) {
+ this.header = header;
+ return this;
+ }
+
+ public Builder hasHeader() {
+ this.useHeader = true;
+ return this;
+ }
+
+ public Builder hasHeader(boolean hasHeader) {
+ this.useHeader = hasHeader;
+ return this;
+ }
+
+ public Builder linesToSkip(int linesToSkip) {
+ this.linesToSkip = linesToSkip;
+ return this;
+ }
+
+ public CSVProperties build() {
+ return new CSVProperties(
+ charset, delimiter, quote, escape,
+ header, useHeader, linesToSkip);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
new file mode 100644
index 0000000..9adf22e
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.csv;
+
+import org.apache.parquet.cli.util.RecordException;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.ReflectData;
+import java.util.List;
+
+class RecordBuilder<E> {
+ private final Schema schema;
+ private final Class<E> recordClass;
+ private final Schema.Field[] fields;
+ private final int[] indexes; // Record position to CSV field position
+
+ public RecordBuilder(Schema schema, Class<E> recordClass, List<String> header) {
+ this.schema = schema;
+ this.recordClass = recordClass;
+
+ // initialize the index and field arrays
+ fields = schema.getFields().toArray(new Schema.Field[schema.getFields().size()]);
+ indexes = new int[fields.length];
+
+ if (header != null) {
+ for (int i = 0; i < fields.length; i += 1) {
+ fields[i] = schema.getFields().get(i);
+ indexes[i] = Integer.MAX_VALUE; // never present in the row
+ }
+
+ // there's a header in next
+ for (int i = 0; i < header.size(); i += 1) {
+ Schema.Field field = schema.getField(header.get(i));
+ if (field != null) {
+ indexes[field.pos()] = i;
+ }
+ }
+
+ } else {
+ // without a header, map to fields by position
+ for (int i = 0; i < fields.length; i += 1) {
+ fields[i] = schema.getFields().get(i);
+ indexes[i] = i;
+ }
+ }
+ }
+
+ public E makeRecord(String[] fields, E reuse) {
+ E record = reuse;
+ if (record == null) {
+ record = newRecordInstance();
+ }
+
+ if (record instanceof IndexedRecord) {
+ fillIndexed((IndexedRecord) record, fields);
+ } else {
+ fillReflect(record, fields);
+ }
+
+ return record;
+ }
+
+ @SuppressWarnings("unchecked")
+ private E newRecordInstance() {
+ if (recordClass != GenericData.Record.class && !recordClass.isInterface()) {
+ E record = (E) ReflectData.newInstance(recordClass, schema);
+ if (record != null) {
+ return record;
+ }
+ }
+ return (E) new GenericData.Record(schema);
+ }
+
+ private void fillIndexed(IndexedRecord record, String[] data) {
+ for (int i = 0; i < indexes.length; i += 1) {
+ int index = indexes[i];
+ record.put(i,
+ makeValue(index < data.length ? data[index] : null, fields[i]));
+ }
+ }
+
+ private void fillReflect(Object record, String[] data) {
+ for (int i = 0; i < indexes.length; i += 1) {
+ Schema.Field field = fields[i];
+ int index = indexes[i];
+ Object value = makeValue(index < data.length ? data[index] : null, field);
+ ReflectData.get().setField(record, field.name(), i, value);
+ }
+ }
+
+ private static Object makeValue(String string, Schema.Field field) {
+ try {
+ Object value = makeValue(string, field.schema());
+ if (value != null || Schemas.nullOk(field.schema())) {
+ return value;
+ } else {
+ // this will fail if there is no default value
+ return ReflectData.get().getDefaultValue(field);
+ }
+ } catch (RecordException e) {
+ // add the field name to the error message
+ throw new RecordException(String.format(
+ "Cannot convert field %s", field.name()), e);
+ } catch (NumberFormatException e) {
+ throw new RecordException(String.format(
+ "Field %s: value not a %s: '%s'",
+ field.name(), field.schema(), string), e);
+ } catch (AvroRuntimeException e) {
+ throw new RecordException(String.format(
+ "Field %s: cannot make %s value: '%s'",
+ field.name(), field.schema(), string), e);
+ }
+ }
+
+ /**
+ * Returns a the value as the first matching schema type or null.
+ *
+ * Note that if the value may be null even if the schema does not allow the
+ * value to be null.
+ *
+ * @param string a String representation of the value
+ * @param schema a Schema
+ * @return the string coerced to the correct type from the schema or null
+ */
+ private static Object makeValue(String string, Schema schema) {
+ if (string == null) {
+ return null;
+ }
+
+ try {
+ switch (schema.getType()) {
+ case BOOLEAN:
+ return Boolean.valueOf(string);
+ case STRING:
+ return string;
+ case FLOAT:
+ return Float.valueOf(string);
+ case DOUBLE:
+ return Double.valueOf(string);
+ case INT:
+ return Integer.valueOf(string);
+ case LONG:
+ return Long.valueOf(string);
+ case ENUM:
+ // TODO: translate to enum class
+ if (schema.hasEnumSymbol(string)) {
+ return string;
+ } else {
+ try {
+ return schema.getEnumSymbols().get(Integer.parseInt(string));
+ } catch (IndexOutOfBoundsException ex) {
+ return null;
+ }
+ }
+ case UNION:
+ Object value = null;
+ for (Schema possible : schema.getTypes()) {
+ value = makeValue(string, possible);
+ if (value != null) {
+ return value;
+ }
+ }
+ return null;
+ case NULL:
+ return null;
+ default:
+ // FIXED, BYTES, MAP, ARRAY, RECORD are not supported
+ throw new RecordException(
+ "Unsupported field type:" + schema.getType());
+ }
+ } catch (NumberFormatException e) {
+ // empty string is considered null for numeric types
+ if (string.isEmpty()) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
new file mode 100644
index 0000000..f17ee83
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BinaryNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.MissingNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.NumericNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.parquet.cli.util.RecordException;
+import org.apache.parquet.cli.util.RuntimeIOException;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AvroJson {
+
+ private static final JsonFactory FACTORY = new JsonFactory();
+
+ public static Iterator<JsonNode> parser(final InputStream stream) {
+ try {
+ JsonParser parser = FACTORY.createParser(stream);
+ parser.setCodec(new ObjectMapper());
+ return parser.readValuesAs(JsonNode.class);
+ } catch (IOException e) {
+ throw new RuntimeIOException("Cannot read from stream", e);
+ }
+ }
+
+ public static JsonNode parse(String json) {
+ return parse(json, JsonNode.class);
+ }
+
+ public static <T> T parse(String json, Class<T> returnType) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ return mapper.readValue(json, returnType);
+ } catch (JsonParseException e) {
+ throw new IllegalArgumentException("Invalid JSON", e);
+ } catch (JsonMappingException e) {
+ throw new IllegalArgumentException("Invalid JSON", e);
+ } catch (IOException e) {
+ throw new RuntimeIOException("Cannot initialize JSON parser", e);
+ }
+ }
+
+ public static JsonNode parse(InputStream json) {
+ return parse(json, JsonNode.class);
+ }
+
+ public static <T> T parse(InputStream json, Class<T> returnType) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ return mapper.readValue(json, returnType);
+ } catch (JsonParseException e) {
+ throw new IllegalArgumentException("Invalid JSON stream", e);
+ } catch (JsonMappingException e) {
+ throw new IllegalArgumentException("Invalid JSON stream", e);
+ } catch (IOException e) {
+ throw new RuntimeIOException("Cannot initialize JSON parser", e);
+ }
+ }
+
+ public static Object convertToAvro(GenericData model, JsonNode datum,
+ Schema schema) {
+ if (datum == null) {
+ return null;
+ }
+ switch (schema.getType()) {
+ case RECORD:
+ RecordException.check(datum.isObject(),
+ "Cannot convert non-object to record: %s", datum);
+ Object record = model.newRecord(null, schema);
+ for (Schema.Field field : schema.getFields()) {
+ model.setField(record, field.name(), field.pos(),
+ convertField(model, datum.get(field.name()), field));
+ }
+ return record;
+
+ case MAP:
+ RecordException.check(datum.isObject(),
+ "Cannot convert non-object to map: %s", datum);
+ Map<String, Object> map = Maps.newLinkedHashMap();
+ Iterator<Map.Entry<String, JsonNode>> iter = datum.fields();
+ while (iter.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iter.next();
+ map.put(entry.getKey(), convertToAvro(
+ model, entry.getValue(), schema.getValueType()));
+ }
+ return map;
+
+ case ARRAY:
+ RecordException.check(datum.isArray(),
+ "Cannot convert to array: %s", datum);
+ List<Object> list = Lists.newArrayListWithExpectedSize(datum.size());
+ for (JsonNode element : datum) {
+ list.add(convertToAvro(model, element, schema.getElementType()));
+ }
+ return list;
+
+ case UNION:
+ return convertToAvro(model, datum,
+ resolveUnion(datum, schema.getTypes()));
+
+ case BOOLEAN:
+ RecordException.check(datum.isBoolean(),
+ "Cannot convert to boolean: %s", datum);
+ return datum.booleanValue();
+
+ case FLOAT:
+ RecordException.check(datum.isFloat() || datum.isInt(),
+ "Cannot convert to float: %s", datum);
+ return datum.floatValue();
+
+ case DOUBLE:
+ RecordException.check(
+ datum.isDouble() || datum.isFloat() ||
+ datum.isLong() || datum.isInt(),
+ "Cannot convert to double: %s", datum);
+ return datum.doubleValue();
+
+ case INT:
+ RecordException.check(datum.isInt(),
+ "Cannot convert to int: %s", datum);
+ return datum.intValue();
+
+ case LONG:
+ RecordException.check(datum.isLong() || datum.isInt(),
+ "Cannot convert to long: %s", datum);
+ return datum.longValue();
+
+ case STRING:
+ RecordException.check(datum.isTextual(),
+ "Cannot convert to string: %s", datum);
+ return datum.textValue();
+
+ case ENUM:
+ RecordException.check(datum.isTextual(),
+ "Cannot convert to string: %s", datum);
+ return model.createEnum(datum.textValue(), schema);
+
+ case BYTES:
+ RecordException.check(datum.isBinary(),
+ "Cannot convert to binary: %s", datum);
+ try {
+ return ByteBuffer.wrap(datum.binaryValue());
+ } catch (IOException e) {
+ throw new RecordException("Failed to read JSON binary", e);
+ }
+
+ case FIXED:
+ RecordException.check(datum.isBinary(),
+ "Cannot convert to fixed: %s", datum);
+ byte[] bytes;
+ try {
+ bytes = datum.binaryValue();
+ } catch (IOException e) {
+ throw new RecordException("Failed to read JSON binary", e);
+ }
+ RecordException.check(bytes.length < schema.getFixedSize(),
+ "Binary data is too short: %s bytes for %s", bytes.length, schema);
+ return model.createFixed(null, bytes, schema);
+
+ case NULL:
+ return null;
+
+ default:
+ // don't use DatasetRecordException because this is a Schema problem
+ throw new IllegalArgumentException("Unknown schema type: " + schema);
+ }
+ }
+
+ private static Object convertField(GenericData model, JsonNode datum,
+ Schema.Field field) {
+ try {
+ Object value = convertToAvro(model, datum, field.schema());
+ if (value != null || Schemas.nullOk(field.schema())) {
+ return value;
+ } else {
+ return model.getDefaultValue(field);
+ }
+ } catch (RecordException e) {
+ // add the field name to the error message
+ throw new RecordException(String.format(
+ "Cannot convert field %s", field.name()), e);
+ } catch (AvroRuntimeException e) {
+ throw new RecordException(String.format(
+ "Field %s: cannot make %s value: '%s'",
+ field.name(), field.schema(), String.valueOf(datum)), e);
+ }
+ }
+
+ private static Schema resolveUnion(JsonNode datum, Collection<Schema> schemas) {
+ Set<Schema.Type> primitives = Sets.newHashSet();
+ List<Schema> others = Lists.newArrayList();
+ for (Schema schema : schemas) {
+ if (PRIMITIVES.containsKey(schema.getType())) {
+ primitives.add(schema.getType());
+ } else {
+ others.add(schema);
+ }
+ }
+
+ // Try to identify specific primitive types
+ Schema primitiveSchema = null;
+ if (datum == null || datum.isNull()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.NULL);
+ } else if (datum.isShort() || datum.isInt()) {
+ primitiveSchema = closestPrimitive(primitives,
+ Schema.Type.INT, Schema.Type.LONG,
+ Schema.Type.FLOAT, Schema.Type.DOUBLE);
+ } else if (datum.isLong()) {
+ primitiveSchema = closestPrimitive(primitives,
+ Schema.Type.LONG, Schema.Type.DOUBLE);
+ } else if (datum.isFloat()) {
+ primitiveSchema = closestPrimitive(primitives,
+ Schema.Type.FLOAT, Schema.Type.DOUBLE);
+ } else if (datum.isDouble()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.DOUBLE);
+ } else if (datum.isBoolean()) {
+ primitiveSchema = closestPrimitive(primitives, Schema.Type.BOOLEAN);
+ }
+
+ if (primitiveSchema != null) {
+ return primitiveSchema;
+ }
+
+ // otherwise, select the first schema that matches the datum
+ for (Schema schema : others) {
+ if (matches(datum, schema)) {
+ return schema;
+ }
+ }
+
+ throw new RecordException(String.format(
+ "Cannot resolve union: %s not in %s", datum, schemas));
+ }
+
+ // this does not contain string, bytes, or fixed because the datum type
+ // doesn't necessarily determine the schema.
+ private static ImmutableMap<Schema.Type, Schema> PRIMITIVES = ImmutableMap
+ .<Schema.Type, Schema>builder()
+ .put(Schema.Type.NULL, Schema.create(Schema.Type.NULL))
+ .put(Schema.Type.BOOLEAN, Schema.create(Schema.Type.BOOLEAN))
+ .put(Schema.Type.INT, Schema.create(Schema.Type.INT))
+ .put(Schema.Type.LONG, Schema.create(Schema.Type.LONG))
+ .put(Schema.Type.FLOAT, Schema.create(Schema.Type.FLOAT))
+ .put(Schema.Type.DOUBLE, Schema.create(Schema.Type.DOUBLE))
+ .build();
+
+ private static Schema closestPrimitive(Set<Schema.Type> possible, Schema.Type... types) {
+ for (Schema.Type type : types) {
+ if (possible.contains(type) && PRIMITIVES.containsKey(type)) {
+ return PRIMITIVES.get(type);
+ }
+ }
+ return null;
+ }
+
+ private static boolean matches(JsonNode datum, Schema schema) {
+ switch (schema.getType()) {
+ case RECORD:
+ if (datum.isObject()) {
+ // check that each field is present or has a default
+ boolean missingField = false;
+ for (Schema.Field field : schema.getFields()) {
+ if (!datum.has(field.name()) && field.defaultValue() == null) {
+ missingField = true;
+ break;
+ }
+ }
+ if (!missingField) {
+ return true;
+ }
+ }
+ break;
+ case UNION:
+ if (resolveUnion(datum, schema.getTypes()) != null) {
+ return true;
+ }
+ break;
+ case MAP:
+ if (datum.isObject()) {
+ return true;
+ }
+ break;
+ case ARRAY:
+ if (datum.isArray()) {
+ return true;
+ }
+ break;
+ case BOOLEAN:
+ if (datum.isBoolean()) {
+ return true;
+ }
+ break;
+ case FLOAT:
+ if (datum.isFloat() || datum.isInt()) {
+ return true;
+ }
+ break;
+ case DOUBLE:
+ if (datum.isDouble() || datum.isFloat() ||
+ datum.isLong() || datum.isInt()) {
+ return true;
+ }
+ break;
+ case INT:
+ if (datum.isInt()) {
+ return true;
+ }
+ break;
+ case LONG:
+ if (datum.isLong() || datum.isInt()) {
+ return true;
+ }
+ break;
+ case STRING:
+ if (datum.isTextual()) {
+ return true;
+ }
+ break;
+ case ENUM:
+ if (datum.isTextual() && schema.hasEnumSymbol(datum.textValue())) {
+ return true;
+ }
+ break;
+ case BYTES:
+ case FIXED:
+ if (datum.isBinary()) {
+ return true;
+ }
+ break;
+ case NULL:
+ if (datum == null || datum.isNull()) {
+ return true;
+ }
+ break;
+ default: // UNION or unknown
+ throw new IllegalArgumentException("Unsupported schema: " + schema);
+ }
+ return false;
+ }
+
+ public static Schema inferSchema(InputStream incoming, final String name,
+ int numRecords) {
+ Iterator<Schema> schemas = Iterators.transform(parser(incoming),
+ new Function<JsonNode, Schema>() {
+ @Override
+ public Schema apply(JsonNode node) {
+ return inferSchema(node, name);
+ }
+ });
+
+ if (!schemas.hasNext()) {
+ return null;
+ }
+
+ Schema result = schemas.next();
+ for (int i = 1; schemas.hasNext() && i < numRecords; i += 1) {
+ result = Schemas.merge(result, schemas.next());
+ }
+
+ return result;
+ }
+
+ public static Schema inferSchema(JsonNode node, String name) {
+ return visit(node, new JsonSchemaVisitor(name));
+ }
+
+ public static Schema inferSchemaWithMaps(JsonNode node, String name) {
+ return visit(node, new JsonSchemaVisitor(name).useMaps());
+ }
+
+ private static class JsonSchemaVisitor extends JsonTreeVisitor<Schema> {
+
+ private static final Joiner DOT = Joiner.on('.');
+ private final String name;
+ private boolean objectsToRecords = true;
+
+ public JsonSchemaVisitor(String name) {
+ this.name = name;
+ }
+
+ public JsonSchemaVisitor useMaps() {
+ this.objectsToRecords = false;
+ return this;
+ }
+
+ @Override
+ public Schema object(ObjectNode object, Map<String, Schema> fields) {
+ if (objectsToRecords || recordLevels.size() < 1) {
+ List<Schema.Field> recordFields = Lists.newArrayListWithExpectedSize(
+ fields.size());
+
+ for (Map.Entry<String, Schema> entry : fields.entrySet()) {
+ recordFields.add(new Schema.Field(
+ entry.getKey(), entry.getValue(),
+ "Type inferred from '" + object.get(entry.getKey()) + "'",
+ null));
+ }
+
+ Schema recordSchema;
+ if (recordLevels.size() < 1) {
+ recordSchema = Schema.createRecord(name, null, null, false);
+ } else {
+ recordSchema = Schema.createRecord(
+ DOT.join(recordLevels), null, null, false);
+ }
+
+ recordSchema.setFields(recordFields);
+
+ return recordSchema;
+
+ } else {
+ // translate to a map; use LinkedHashSet to preserve schema order
+ switch (fields.size()) {
+ case 0:
+ return Schema.createMap(Schema.create(Schema.Type.NULL));
+ case 1:
+ return Schema.createMap(Iterables.getOnlyElement(fields.values()));
+ default:
+ return Schema.createMap(Schemas.mergeOrUnion(fields.values()));
+ }
+ }
+ }
+
+ @Override
+ public Schema array(ArrayNode ignored, List<Schema> elementSchemas) {
+ // use LinkedHashSet to preserve schema order
+ switch (elementSchemas.size()) {
+ case 0:
+ return Schema.createArray(Schema.create(Schema.Type.NULL));
+ case 1:
+ return Schema.createArray(Iterables.getOnlyElement(elementSchemas));
+ default:
+ return Schema.createArray(Schemas.mergeOrUnion(elementSchemas));
+ }
+ }
+
+ @Override
+ public Schema binary(BinaryNode ignored) {
+ return Schema.create(Schema.Type.BYTES);
+ }
+
+ @Override
+ public Schema text(TextNode ignored) {
+ return Schema.create(Schema.Type.STRING);
+ }
+
+ @Override
+ public Schema number(NumericNode number) {
+ if (number.isInt()) {
+ return Schema.create(Schema.Type.INT);
+ } else if (number.isLong()) {
+ return Schema.create(Schema.Type.LONG);
+ } else if (number.isFloat()) {
+ return Schema.create(Schema.Type.FLOAT);
+ } else if (number.isDouble()) {
+ return Schema.create(Schema.Type.DOUBLE);
+ } else {
+ throw new UnsupportedOperationException(
+ number.getClass().getName() + " is not supported");
+ }
+ }
+
+ @Override
+ public Schema bool(BooleanNode ignored) {
+ return Schema.create(Schema.Type.BOOLEAN);
+ }
+
+ @Override
+ public Schema nullNode(NullNode ignored) {
+ return Schema.create(Schema.Type.NULL);
+ }
+
+ @Override
+ public Schema missing(MissingNode ignored) {
+ throw new UnsupportedOperationException("MissingNode is not supported.");
+ }
+ }
+
+ private static <T> T visit(JsonNode node, JsonTreeVisitor<T> visitor) {
+ switch (node.getNodeType()) {
+ case OBJECT:
+ Preconditions.checkArgument(node instanceof ObjectNode,
+ "Expected instance of ObjectNode: " + node);
+
+ // use LinkedHashMap to preserve field order
+ Map<String, T> fields = Maps.newLinkedHashMap();
+
+ Iterator<Map.Entry<String, JsonNode>> iter = node.fields();
+ while (iter.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iter.next();
+
+ visitor.recordLevels.push(entry.getKey());
+ fields.put(entry.getKey(), visit(entry.getValue(), visitor));
+ visitor.recordLevels.pop();
+ }
+
+ return visitor.object((ObjectNode) node, fields);
+
+ case ARRAY:
+ Preconditions.checkArgument(node instanceof ArrayNode,
+ "Expected instance of ArrayNode: " + node);
+
+ List<T> elements = Lists.newArrayListWithExpectedSize(node.size());
+
+ for (JsonNode element : node) {
+ elements.add(visit(element, visitor));
+ }
+
+ return visitor.array((ArrayNode) node, elements);
+
+ case BINARY:
+ Preconditions.checkArgument(node instanceof BinaryNode,
+ "Expected instance of BinaryNode: " + node);
+ return visitor.binary((BinaryNode) node);
+
+ case STRING:
+ Preconditions.checkArgument(node instanceof TextNode,
+ "Expected instance of TextNode: " + node);
+
+ return visitor.text((TextNode) node);
+
+ case NUMBER:
+ Preconditions.checkArgument(node instanceof NumericNode,
+ "Expected instance of NumericNode: " + node);
+
+ return visitor.number((NumericNode) node);
+
+ case BOOLEAN:
+ Preconditions.checkArgument(node instanceof BooleanNode,
+ "Expected instance of BooleanNode: " + node);
+
+ return visitor.bool((BooleanNode) node);
+
+ case MISSING:
+ Preconditions.checkArgument(node instanceof MissingNode,
+ "Expected instance of MissingNode: " + node);
+
+ return visitor.missing((MissingNode) node);
+
+ case NULL:
+ Preconditions.checkArgument(node instanceof NullNode,
+ "Expected instance of NullNode: " + node);
+
+ return visitor.nullNode((NullNode) node);
+
+ default:
+ throw new IllegalArgumentException(
+ "Unknown node type: " + node.getNodeType() + ": " + node);
+ }
+ }
+
+ private abstract static class JsonTreeVisitor<T> {
+ protected LinkedList<String> recordLevels = Lists.newLinkedList();
+
+ public T object(ObjectNode object, Map<String, T> fields) {
+ return null;
+ }
+
+ public T array(ArrayNode array, List<T> elements) {
+ return null;
+ }
+
+ public T binary(BinaryNode binary) {
+ return null;
+ }
+
+ public T text(TextNode text) {
+ return null;
+ }
+
+ public T number(NumericNode number) {
+ return null;
+ }
+
+ public T bool(BooleanNode bool) {
+ return null;
+ }
+
+ public T missing(MissingNode missing) {
+ return null;
+ }
+
+ public T nullNode(NullNode nullNode) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJsonReader.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJsonReader.java b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJsonReader.java
new file mode 100644
index 0000000..a3b067d
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJsonReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.parquet.cli.util.RuntimeIOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class AvroJsonReader<E> implements Iterator<E>, Iterable<E>, Closeable {
+
+ private final GenericData model;
+ private final Schema schema;
+ private final InputStream stream;
+ private Iterator<E> iterator;
+
+ public AvroJsonReader(InputStream stream, Schema schema) {
+ this.stream = stream;
+ this.schema = schema;
+ this.model = GenericData.get();
+ this.iterator = Iterators.transform(AvroJson.parser(stream),
+ new Function<JsonNode, E>() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public E apply(@Nullable JsonNode node) {
+ return (E) AvroJson.convertToAvro(
+ model, node, AvroJsonReader.this.schema);
+ }
+ });
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public E next() {
+ return iterator.next();
+ }
+
+ @Override
+ public void close() {
+ iterator = null;
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeIOException("Cannot close reader", e);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not implemented.");
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return this;
+ }
+}