You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2017/07/28 23:25:28 UTC

[3/4] parquet-mr git commit: PARQUET-777: Add Parquet CLI.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java
new file mode 100644
index 0000000..7f82874
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Codecs;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.avro.generic.GenericData.Record;
+import static org.apache.parquet.cli.util.Expressions.filterSchema;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+
+@Parameters(commandDescription="Create a Parquet file from a data file")
+public class ConvertCommand extends BaseCommand {
+
+  public ConvertCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<file>")
+  List<String> targets;
+
+  @Parameter(
+      names={"-o", "--output"},
+      description="Output file path",
+      required=true)
+  String outputPath = null;
+
+  @Parameter(names = {"-s", "--schema"},
+      description = "The file containing the Avro schema.")
+  String avroSchemaFile;
+
+  @Parameter(
+      names = {"-c", "--column", "--columns"},
+      description = "List of columns")
+  List<String> columns;
+
+  @Parameter(names = {"--compression-codec"},
+      description = "A compression codec name.")
+  String compressionCodecName = "GZIP";
+
+  @Parameter(
+      names={"--overwrite"},
+      description="Overwrite the output file if it exists")
+  boolean overwrite = false;
+
+  @Parameter(
+      names={"-2", "--format-version-2", "--writer-version-2"},
+      description="Use Parquet format version 2",
+      hidden = true)
+  boolean v2 = false;
+
+  @Parameter(names="--row-group-size", description="Target row group size")
+  int rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
+
+  @Parameter(names="--page-size", description="Target page size")
+  int pageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
+
+  @Parameter(names="--dictionary-size", description="Max dictionary page size")
+  int dictionaryPageSize = ParquetWriter.DEFAULT_PAGE_SIZE;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(targets != null && targets.size() == 1,
+        "A data file is required.");
+
+    String source = targets.get(0);
+
+    CompressionCodecName codec = Codecs.parquetCodec(compressionCodecName);
+
+    Schema schema;
+    if (avroSchemaFile != null) {
+      schema = Schemas.fromAvsc(open(avroSchemaFile));
+    } else {
+      schema = getAvroSchema(source);
+    }
+    Schema projection = filterSchema(schema, columns);
+
+    Path outPath = qualifiedPath(outputPath);
+    FileSystem outFS = outPath.getFileSystem(getConf());
+    if (overwrite && outFS.exists(outPath)) {
+      console.debug("Deleting output file {} (already exists)", outPath);
+      outFS.delete(outPath);
+    }
+
+    Iterable<Record> reader = openDataFile(source, projection);
+    boolean threw = true;
+    long count = 0;
+    try {
+      try (ParquetWriter<Record> writer = AvroParquetWriter
+          .<Record>builder(qualifiedPath(outputPath))
+          .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
+          .withConf(getConf())
+          .withCompressionCodec(codec)
+          .withRowGroupSize(rowGroupSize)
+          .withDictionaryPageSize(dictionaryPageSize < 64 ? 64 : dictionaryPageSize)
+          .withDictionaryEncoding(dictionaryPageSize != 0)
+          .withPageSize(pageSize)
+          .withDataModel(GenericData.get())
+          .withSchema(projection)
+          .build()) {
+        for (Record record : reader) {
+          writer.write(record);
+          count += 1;
+        }
+      }
+      threw = false;
+    } catch (RuntimeException e) {
+      throw new RuntimeException("Failed on record " + count, e);
+    } finally {
+      if (reader instanceof Closeable) {
+        Closeables.close((Closeable) reader, threw);
+      }
+    }
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Create a Parquet file from an Avro file",
+        "sample.avro -o sample.parquet",
+        "# Create a Parquet file in S3 from a local Avro file",
+        "path/to/sample.avro -o s3:/user/me/sample.parquet",
+        "# Create a Parquet file from Avro data in S3",
+        "s3:/data/path/sample.avro -o sample.parquet"
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
new file mode 100644
index 0000000..0bd77a3
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.commons.lang.StringUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.cli.Util.encodingStatsAsString;
+import static org.apache.parquet.cli.Util.encodingsAsString;
+import static org.apache.parquet.cli.Util.humanReadable;
+import static org.apache.parquet.cli.Util.minMaxAsString;
+import static org.apache.parquet.cli.Util.primitive;
+import static org.apache.parquet.cli.Util.shortCodec;
+
+@Parameters(commandDescription="Print a Parquet file's metadata")
+public class ParquetMetadataCommand extends BaseCommand {
+
+  public ParquetMetadataCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<parquet path>")
+  List<String> targets;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(targets != null && targets.size() >= 1,
+        "A Parquet file is required.");
+    Preconditions.checkArgument(targets.size() == 1,
+        "Cannot process multiple Parquet files.");
+
+    String source = targets.get(0);
+    ParquetMetadata footer = ParquetFileReader.readFooter(
+        getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER);
+
+    console.info("\nFile path:  {}", source);
+    console.info("Created by: {}", footer.getFileMetaData().getCreatedBy());
+
+    Map<String, String> kv = footer.getFileMetaData().getKeyValueMetaData();
+    if (kv != null && !kv.isEmpty()) {
+      console.info("Properties:");
+      String format = "  %" + maxSize(kv.keySet()) + "s: %s";
+      for (Map.Entry<String, String> entry : kv.entrySet()) {
+        console.info(String.format(format, entry.getKey(), entry.getValue()));
+      }
+    } else {
+      console.info("Properties: (none)");
+    }
+
+    MessageType schema = footer.getFileMetaData().getSchema();
+    console.info("Schema:\n{}", schema);
+
+    List<BlockMetaData> rowGroups = footer.getBlocks();
+    for (int index = 0, n = rowGroups.size(); index < n; index += 1) {
+      printRowGroup(console, index, rowGroups.get(index), schema);
+    }
+
+    console.info("");
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+    );
+  }
+
+  private int maxSize(Iterable<String> strings) {
+    int size = 0;
+    for (String s : strings) {
+      size = Math.max(size, s.length());
+    }
+    return size;
+  }
+
+  private void printRowGroup(Logger console, int index, BlockMetaData rowGroup, MessageType schema) {
+    long start = rowGroup.getStartingPos();
+    long rowCount = rowGroup.getRowCount();
+    long compressedSize = rowGroup.getCompressedSize();
+    long uncompressedSize = rowGroup.getTotalByteSize();
+    String filePath = rowGroup.getPath();
+
+    console.info(String.format("\nRow group %d:  count: %d  %s records  start: %d  total: %s%s\n%s",
+        index, rowCount,
+        humanReadable(((float) compressedSize) / rowCount),
+        start, humanReadable(compressedSize),
+        filePath != null ? " path: " + filePath : "",
+        StringUtils.leftPad("", 80, '-')));
+
+    int size = maxSize(Iterables.transform(rowGroup.getColumns(),
+        new Function<ColumnChunkMetaData, String>() {
+          @Override
+          public String apply(@Nullable ColumnChunkMetaData input) {
+            return input == null ? "" : input.getPath().toDotString();
+          }
+        }));
+
+    console.info(String.format("%-" + size + "s  %-9s %-9s %-9s %-10s %-7s %s",
+        "", "type", "encodings", "count", "avg size", "nulls", "min / max"));
+    for (ColumnChunkMetaData column : rowGroup.getColumns()) {
+      printColumnChunk(console, size, column, schema);
+    }
+  }
+
+  private void printColumnChunk(Logger console, int width, ColumnChunkMetaData column, MessageType schema) {
+    String[] path = column.getPath().toArray();
+    PrimitiveType type = primitive(schema, path);
+    Preconditions.checkNotNull(type);
+
+    ColumnDescriptor desc = schema.getColumnDescription(path);
+    long size = column.getTotalSize();
+    long count = column.getValueCount();
+    float perValue = ((float) size) / count;
+    CompressionCodecName codec = column.getCodec();
+    Set<Encoding> encodings = column.getEncodings();
+    EncodingStats encodingStats = column.getEncodingStats();
+    String encodingSummary = encodingStats == null ?
+        encodingsAsString(encodings, desc) :
+        encodingStatsAsString(encodingStats);
+    Statistics stats = column.getStatistics();
+
+    String name = column.getPath().toDotString();
+
+    PrimitiveType.PrimitiveTypeName typeName = type.getPrimitiveTypeName();
+    if (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+      console.info(String.format("%-" + width + "s  FIXED[%d] %s %-7s %-9d %-8s %-7s %s",
+          name, type.getTypeLength(), shortCodec(codec), encodingSummary, count,
+          humanReadable(perValue), stats == null ? "" : String.valueOf(stats.getNumNulls()),
+          minMaxAsString(stats, type.getOriginalType())));
+    } else {
+      console.info(String.format("%-" + width + "s  %-9s %s %-7s %-9d %-10s %-7s %s",
+          name, typeName, shortCodec(codec), encodingSummary, count, humanReadable(perValue),
+          stats == null ? "" : String.valueOf(stats.getNumNulls()),
+          minMaxAsString(stats, type.getOriginalType())));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
new file mode 100644
index 0000000..ea2306f
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Formats;
+import org.apache.avro.file.SeekableInput;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.slf4j.Logger;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+@Parameters(commandDescription="Print the Avro schema for a file")
+public class SchemaCommand extends BaseCommand {
+
+  public SchemaCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<parquet path>")
+  List<String> targets;
+
+  @Parameter(
+      names={"-o", "--output"},
+      description="Output file path")
+  String outputPath = null;
+
+  @Parameter(
+      names={"--overwrite"},
+      description="Overwrite the output file if it exists")
+  boolean overwrite = false;
+
+  @Parameter(
+      names={"--parquet"},
+      description="Print a Parquet schema, without converting to Avro",
+      hidden=true)
+  boolean parquetSchema = false;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(targets != null && targets.size() == 1,
+        "Parquet file is required.");
+
+    if (targets.size() > 1) {
+      Preconditions.checkArgument(outputPath == null,
+          "Cannot output multiple schemas to file " + outputPath);
+      for (String source : targets) {
+        console.info("{}: {}", source, getSchema(source));
+      }
+
+    } else {
+      String source = targets.get(0);
+
+      if (outputPath != null) {
+        Path outPath = qualifiedPath(outputPath);
+        FileSystem outFS = outPath.getFileSystem(getConf());
+        if (overwrite && outFS.exists(outPath)) {
+          console.debug("Deleting output file {} (already exists)", outPath);
+          outFS.delete(outPath);
+        }
+
+        try (OutputStream out = create(outputPath)) {
+          out.write(getSchema(source).getBytes(StandardCharsets.UTF_8));
+        }
+      } else {
+        console.info(getSchema(source));
+      }
+    }
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Print the Avro schema for a Parquet file",
+        "sample.parquet",
+        "# Print the Avro schema for an Avro file",
+        "sample.avro",
+        "# Print the Avro schema for a JSON file",
+        "sample.json"
+    );
+  }
+
+  private String getSchema(String source) throws IOException {
+    if (parquetSchema) {
+      return getParquetSchema(source);
+    } else {
+      return getAvroSchema(source).toString(true);
+    }
+  }
+
+  private String getParquetSchema(String source) throws IOException {
+    Formats.Format format;
+    try (SeekableInput in = openSeekable(source)) {
+      format = Formats.detectFormat((InputStream) in);
+      in.seek(0);
+
+      switch (format) {
+        case PARQUET:
+          return new ParquetFileReader(
+              getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)
+              .getFileMetaData().getSchema().toString();
+        default:
+          throw new IllegalArgumentException(String.format(
+              "Could not get a Parquet schema for format %s: %s", format, source));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
new file mode 100644
index 0000000..db427c9
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.Util;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import java.io.IOException;
+import java.util.List;
+
+// TODO: show dictionary size in values and in bytes
+@Parameters(commandDescription="Print dictionaries for a Parquet column")
+public class ShowDictionaryCommand extends BaseCommand {
+
+  public ShowDictionaryCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<parquet path>")
+  List<String> targets;
+
+  @Parameter(
+      names = {"-c", "--column"},
+      description = "Column path",
+      required = true)
+  String column;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(targets != null && targets.size() >= 1,
+        "A Parquet file is required.");
+    Preconditions.checkArgument(targets.size() == 1,
+        "Cannot process multiple Parquet files.");
+
+    String source = targets.get(0);
+
+    ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
+    MessageType schema = reader.getFileMetaData().getSchema();
+    ColumnDescriptor descriptor = Util.descriptor(column, schema);
+    PrimitiveType type = Util.primitive(column, schema);
+    Preconditions.checkNotNull(type);
+
+    DictionaryPageReadStore dictionaryReader;
+    int rowGroup = 0;
+    while ((dictionaryReader = reader.getNextDictionaryReader()) != null) {
+      DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor);
+
+      Dictionary dict = page.getEncoding().initDictionary(descriptor, page);
+
+      console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize());
+      for (int i = 0; i <= dict.getMaxId(); i += 1) {
+        switch(type.getPrimitiveTypeName()) {
+          case BINARY:
+            if (type.getOriginalType() == OriginalType.UTF8) {
+              console.info("{}: {}", String.format("%6d", i),
+                  Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
+            } else {
+              console.info("{}: {}", String.format("%6d", i),
+                  Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70));
+            }
+            break;
+          case INT32:
+            console.info("{}: {}", String.format("%6d", i),
+              dict.decodeToInt(i));
+            break;
+          case INT64:
+            console.info("{}: {}", String.format("%6d", i),
+                dict.decodeToLong(i));
+            break;
+          case FLOAT:
+            console.info("{}: {}", String.format("%6d", i),
+                dict.decodeToFloat(i));
+            break;
+          case DOUBLE:
+            console.info("{}: {}", String.format("%6d", i),
+                dict.decodeToDouble(i));
+            break;
+          default:
+            throw new IllegalArgumentException(
+                "Unknown dictionary type: " + type.getPrimitiveTypeName());
+        }
+      }
+
+      reader.skipNextRowGroup();
+
+      rowGroup += 1;
+    }
+
+    console.info("");
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Show the dictionary for column 'col' from a Parquet file",
+        "-c col sample.parquet"
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
new file mode 100644
index 0000000..beda452
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.commons.lang.StringUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.Page;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.parquet.cli.Util.columnName;
+import static org.apache.parquet.cli.Util.descriptor;
+import static org.apache.parquet.cli.Util.encodingAsString;
+import static org.apache.parquet.cli.Util.humanReadable;
+import static org.apache.parquet.cli.Util.minMaxAsString;
+import static org.apache.parquet.cli.Util.primitive;
+import static org.apache.parquet.cli.Util.shortCodec;
+
+@Parameters(commandDescription="Print page summaries for a Parquet file")
+public class ShowPagesCommand extends BaseCommand {
+
+  public ShowPagesCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<parquet path>")
+  List<String> targets;
+
+  @Parameter(
+      names = {"-c", "--column", "--columns"},
+      description = "List of columns")
+  List<String> columns;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(targets != null && targets.size() >= 1,
+        "A Parquet file is required.");
+    Preconditions.checkArgument(targets.size() == 1,
+        "Cannot process multiple Parquet files.");
+
+    String source = targets.get(0);
+    ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
+
+    MessageType schema = reader.getFileMetaData().getSchema();
+    Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
+    if (this.columns == null || this.columns.isEmpty()) {
+      for (ColumnDescriptor descriptor : schema.getColumns()) {
+        columns.put(descriptor, primitive(schema, descriptor.getPath()));
+      }
+    } else {
+      for (String column : this.columns) {
+        columns.put(descriptor(column, schema), primitive(column, schema));
+      }
+    }
+
+    CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
+    // accumulate formatted lines to print by column
+    Map<String, List<String>> formatted = Maps.newLinkedHashMap();
+    PageFormatter formatter = new PageFormatter();
+    PageReadStore pageStore;
+    int rowGroupNum = 0;
+    while ((pageStore = reader.readNextRowGroup()) != null) {
+      for (ColumnDescriptor descriptor : columns.keySet()) {
+        List<String> lines = formatted.get(columnName(descriptor));
+        if (lines == null) {
+          lines = Lists.newArrayList();
+          formatted.put(columnName(descriptor), lines);
+        }
+
+        formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
+        PageReader pages = pageStore.getPageReader(descriptor);
+
+        DictionaryPage dict = pages.readDictionaryPage();
+        if (dict != null) {
+          lines.add(formatter.format(dict));
+        }
+        DataPage page;
+        while ((page = pages.readPage()) != null) {
+          lines.add(formatter.format(page));
+        }
+      }
+      rowGroupNum += 1;
+    }
+
+    // TODO: Show total column size and overall size per value in the column summary line
+    for (String columnName : formatted.keySet()) {
+      console.info(String.format("\nColumn: %s\n%s", columnName, StringUtils.leftPad("", 80, '-')));
+      console.info(formatter.getHeader());
+      for (String line : formatted.get(columnName)) {
+        console.info(line);
+      }
+      console.info("");
+    }
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Show pages for column 'col' from a Parquet file",
+        "-c col sample.parquet"
+    );
+  }
+
+  private class PageFormatter implements DataPage.Visitor<String> {
+    private int rowGroupNum;
+    private int pageNum;
+    private PrimitiveType type;
+    private String shortCodec;
+
+    String getHeader() {
+      return String.format("  %-6s %-5s %-4s %-7s %-10s %-10s %-8s %-7s %s",
+          "page", "type", "enc", "count", "avg size", "size", "rows", "nulls", "min / max");
+    }
+
+    void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec) {
+      this.rowGroupNum = rowGroupNum;
+      this.pageNum = 0;
+      this.type = type;
+      this.shortCodec = shortCodec(codec);
+    }
+
+    String format(Page page) {
+      String formatted = "";
+      if (page instanceof DictionaryPage) {
+        formatted = printDictionaryPage((DictionaryPage) page);
+      } else if (page instanceof DataPage) {
+        formatted = ((DataPage) page).accept(this);
+      }
+      pageNum += 1;
+      return formatted;
+    }
+
+    private String printDictionaryPage(DictionaryPage dict) {
+      // TODO: the compressed size of a dictionary page is lost in Parquet
+      dict.getUncompressedSize();
+      long totalSize = dict.getCompressedSize();
+      int count = dict.getDictionarySize();
+      float perValue = ((float) totalSize) / count;
+      String enc = encodingAsString(dict.getEncoding(), true);
+      if (pageNum == 0) {
+        return String.format("%3d-D    %-5s %s %-2s %-7d %-10s %-10s",
+            rowGroupNum, "dict", shortCodec, enc, count, humanReadable(perValue),
+            humanReadable(totalSize));
+      } else {
+        return String.format("%3d-%-3d  %-5s %s %-2s %-7d %-10s %-10s",
+            rowGroupNum, pageNum, "dict", shortCodec, enc, count, humanReadable(perValue),
+            humanReadable(totalSize));
+      }
+    }
+
+    @Override
+    public String visit(DataPageV1 page) {
+      String enc = encodingAsString(page.getValueEncoding(), false);
+      long totalSize = page.getCompressedSize();
+      int count = page.getValueCount();
+      long numNulls = page.getStatistics().getNumNulls();
+      float perValue = ((float) totalSize) / count;
+      String minMax = minMaxAsString(page.getStatistics(), type.getOriginalType());
+      return String.format("%3d-%-3d  %-5s %s %-2s %-7d %-10s %-10s %-8s %-7s %s",
+          rowGroupNum, pageNum, "data", shortCodec, enc, count, humanReadable(perValue),
+          humanReadable(totalSize), "", numNulls, minMax);
+    }
+
+    @Override
+    public String visit(DataPageV2 page) {
+      String enc = encodingAsString(page.getDataEncoding(), false);
+      long totalSize = page.getCompressedSize();
+      int count = page.getValueCount();
+      int numRows = page.getRowCount();
+      int numNulls = page.getNullCount();
+      float perValue = ((float) totalSize) / count;
+      String minMax = minMaxAsString(page.getStatistics(), type.getOriginalType());
+      String compression = (page.isCompressed() ? shortCodec : "_");
+      return String.format("%3d-%-3d  %-5s %s %-2s %-7d %-10s %-10s %-8d %-7s %s",
+          rowGroupNum, pageNum, "data", compression, enc, count, humanReadable(perValue),
+          humanReadable(totalSize), numRows, numNulls, minMax);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
new file mode 100644
index 0000000..ceb11cf
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.cli.util.Codecs;
+import org.apache.parquet.cli.util.Schemas;
+import org.slf4j.Logger;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.avro.generic.GenericData.Record;
+import static org.apache.parquet.cli.util.Expressions.filterSchema;
+
+@Parameters(commandDescription="Create an Avro file from a data file")
+public class ToAvroCommand extends BaseCommand {
+
+  public ToAvroCommand(Logger console) {
+    super(console);
+  }
+
+  @Parameter(description = "<file>")
+  List<String> targets;
+
+  @Parameter(
+      names={"-o", "--output"},
+      description="Output file path",
+      required=true)
+  String outputPath = null;
+
+  @Parameter(names = {"-s", "--schema"},
+      description = "The file containing an Avro schema for the output file")
+  String avroSchemaFile;
+
+  @Parameter(
+      names = {"-c", "--column", "--columns"},
+      description = "List of columns")
+  List<String> columns;
+
+  @Parameter(names = {"--compression-codec"},
+      description = "A compression codec name.")
+  String compressionCodecName = "GZIP";
+
+  @Parameter(
+      names={"--overwrite"},
+      description="Overwrite the output file if it exists")
+  boolean overwrite = false;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(targets != null && targets.size() == 1,
+        "A data file is required.");
+
+    String source = targets.get(0);
+
+    CodecFactory codecFactory = Codecs.avroCodec(compressionCodecName);
+
+    Schema schema;
+    if (avroSchemaFile != null) {
+      schema = Schemas.fromAvsc(open(avroSchemaFile));
+    } else {
+      schema = getAvroSchema(source);
+    }
+    Schema projection = filterSchema(schema, columns);
+
+    Path outPath = qualifiedPath(outputPath);
+    FileSystem outFS = outPath.getFileSystem(getConf());
+    if (overwrite && outFS.exists(outPath)) {
+      console.debug("Deleting output file {} (already exists)", outPath);
+      outFS.delete(outPath);
+    }
+
+    Iterable<Record> reader = openDataFile(source, projection);
+    boolean threw = true;
+    long count = 0;
+    try {
+      DatumWriter<Record> datumWriter = new GenericDatumWriter<>(schema);
+      DataFileWriter<Record> w = new DataFileWriter<>(datumWriter);
+      w.setCodec(codecFactory);
+
+      try (DataFileWriter<Record> writer = w.create(projection, create(outputPath))) {
+        for (Record record : reader) {
+          writer.append(record);
+          count += 1;
+        }
+      }
+      threw = false;
+    } catch (RuntimeException e) {
+      throw new RuntimeException("Failed on record " + count, e);
+    } finally {
+      if (reader instanceof Closeable) {
+        Closeables.close((Closeable) reader, threw);
+      }
+    }
+
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Create an Avro file from a Parquet file",
+        "sample.parquet sample.avro",
+        "# Create an Avro file in HDFS from a local JSON file",
+        "path/to/sample.json hdfs:/user/me/sample.parquet",
+        "# Create an Avro file from data in S3",
+        "s3:/data/path/sample.parquet sample.avro"
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSV.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSV.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSV.java
new file mode 100644
index 0000000..47cd665
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSV.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.csv;
+
+import au.com.bytecode.opencsv.CSVParser;
+import au.com.bytecode.opencsv.CSVReader;
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static java.lang.Math.min;
+
+public class AvroCSV {
+
+  private static final Pattern LONG = Pattern.compile("\\d+");
+  private static final Pattern DOUBLE = Pattern.compile("\\d*\\.\\d*[dD]?");
+  private static final Pattern FLOAT = Pattern.compile("\\d*\\.\\d*[fF]?");
+  private static final int DEFAULT_INFER_LINES = 25;
+  private static final Set<String> NO_REQUIRED_FIELDS = ImmutableSet.of();
+  //As per the Avro specs mentioned here -http://avro.apache.org/docs/1.7.5/spec.html
+  // It should start with [A-Za-z_] and subsequently contain only [A-Za-z0-9_]
+  private static final Pattern AVRO_COMPATIBLE = Pattern.
+      compile("^[A-Za-z_][A-Za-z\\d_]*$");
+
+  static CSVReader newReader(InputStream incoming, CSVProperties props) {
+    return new CSVReader(
+        new InputStreamReader(incoming, Charset.forName(props.charset)),
+        props.delimiter.charAt(0), props.quote.charAt(0),
+        props.escape.charAt(0), props.linesToSkip,
+        false /* strict quotes off: don't ignore unquoted strings */,
+        true /* ignore leading white-space */ );
+  }
+
+  static CSVParser newParser(CSVProperties props) {
+    return new CSVParser(
+        props.delimiter.charAt(0), props.quote.charAt(0),
+        props.escape.charAt(0),
+        false /* strict quotes off: don't ignore unquoted strings */,
+        true /* ignore leading white-space */ );
+  }
+
+  public static Schema inferNullableSchema(String name, InputStream incoming,
+                                           CSVProperties props)
+      throws IOException {
+    return inferSchemaInternal(name, incoming, props, NO_REQUIRED_FIELDS, true);
+  }
+
+  public static Schema inferNullableSchema(String name, InputStream incoming,
+                                           CSVProperties props,
+                                           Set<String> requiredFields)
+      throws IOException {
+    return inferSchemaInternal(name, incoming, props, requiredFields, true);
+  }
+
+  public static Schema inferSchema(String name, InputStream incoming,
+                                   CSVProperties props)
+      throws IOException {
+    return inferSchemaInternal(name, incoming, props, NO_REQUIRED_FIELDS, false);
+  }
+
+  public static Schema inferSchema(String name, InputStream incoming,
+                                   CSVProperties props,
+                                   Set<String> requiredFields)
+      throws IOException {
+    return inferSchemaInternal(name, incoming, props, requiredFields, false);
+  }
+
+  private static Schema inferSchemaInternal(String name, InputStream incoming,
+                                            CSVProperties props,
+                                            Set<String> requiredFields,
+                                            boolean makeNullable)
+      throws IOException {
+    CSVReader reader = newReader(incoming, props);
+
+    String[] header;
+    String[] line;
+    if (props.useHeader) {
+      // read the header and then the first line
+      header = reader.readNext();
+      line = reader.readNext();
+      Preconditions.checkNotNull(line, "No content to infer schema");
+
+    } else if (props.header != null) {
+      header = newParser(props).parseLine(props.header);
+      line = reader.readNext();
+      Preconditions.checkNotNull(line, "No content to infer schema");
+
+    } else {
+      // use the first line to create a header
+      line = reader.readNext();
+      Preconditions.checkNotNull(line, "No content to infer schema");
+      header = new String[line.length];
+      for (int i = 0; i < line.length; i += 1) {
+        header[i] = "field_" + String.valueOf(i);
+      }
+    }
+
+    Schema.Type[] types = new Schema.Type[header.length];
+    String[] values = new String[header.length];
+    boolean[] nullable = new boolean[header.length];
+    boolean[] empty = new boolean[header.length];
+
+    for (int processed = 0; processed < DEFAULT_INFER_LINES; processed += 1) {
+      if (line == null) {
+        break;
+      }
+
+      for (int i = 0; i < header.length; i += 1) {
+        if (i < line.length) {
+          if (types[i] == null) {
+            types[i] = inferFieldType(line[i]);
+            if (types[i] != null) {
+              // keep track of the value used
+              values[i] = line[i];
+            }
+          }
+
+          if (line[i] == null) {
+            nullable[i] = true;
+          } else if (line[i].isEmpty()) {
+            empty[i] = true;
+          }
+        } else {
+          // no value results in null
+          nullable[i] = true;
+        }
+      }
+
+      line = reader.readNext();
+    }
+
+    SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder.record(name).fields();
+
+    // types may be missing, but fieldSchema will return a nullable string
+    for (int i = 0; i < header.length; i += 1) {
+      if (header[i] == null) {
+        throw new RuntimeException("Bad header for field " + i + ": null");
+      }
+
+      String fieldName = header[i].trim();
+
+      if (fieldName.isEmpty()) {
+        throw new RuntimeException(
+            "Bad header for field " + i + ": \"" + fieldName + "\"");
+      } else if(!isAvroCompatibleName(fieldName)) {
+    	  throw new RuntimeException(
+              "Bad header for field, should start with a character " +
+              "or _ and can contain only alphanumerics and _ " +
+              i + ": \"" + fieldName + "\"");
+      }
+
+      // the empty string is not considered null for string fields
+      boolean foundNull = (nullable[i] ||
+          (empty[i] && types[i] != Schema.Type.STRING));
+
+      if (requiredFields.contains(fieldName)) {
+        if (foundNull) {
+          throw new RuntimeException("Found null value for required field: " +
+              fieldName + " (" + types[i] + ")");
+        }
+        fieldAssembler = fieldAssembler.name(fieldName)
+            .doc("Type inferred from '" + sample(values[i]) + "'")
+            .type(schema(types[i], false)).noDefault();
+      } else {
+        SchemaBuilder.GenericDefault<Schema> defaultBuilder = fieldAssembler.name(fieldName)
+            .doc("Type inferred from '" + sample(values[i]) + "'")
+            .type(schema(types[i], makeNullable || foundNull));
+        if (makeNullable || foundNull) {
+          fieldAssembler = defaultBuilder.withDefault(null);
+        } else {
+          fieldAssembler = defaultBuilder.noDefault();
+        }
+      }
+    }
+    return fieldAssembler.endRecord();
+  }
+
+  private static final CharMatcher NON_PRINTABLE = CharMatcher
+      .inRange('\u0020', '\u007e').negate();
+
+  private static String sample(String value) {
+    if (value != null) {
+      return NON_PRINTABLE.replaceFrom(
+          value.subSequence(0, min(50, value.length())), '.');
+    } else {
+      return "null";
+    }
+  }
+
+  /**
+   * Create a {@link Schema} for the given type. If the type is null,
+   * the schema will be a nullable String. If isNullable is true, the returned
+   * schema will be nullable.
+   *
+   * @param type a {@link Schema.Type} compatible with {@code Schema.create}
+   * @param makeNullable If {@code true}, the return type will be nullable
+   * @return a {@code Schema} for the given {@code Schema.Type}
+   * @see Schema#create(org.apache.avro.Schema.Type)
+   */
+  private static Schema schema(Schema.Type type, boolean makeNullable) {
+    Schema schema = Schema.create(type == null ? Schema.Type.STRING : type);
+    if (makeNullable || type == null) {
+      schema = Schema.createUnion(Lists.newArrayList(
+          Schema.create(Schema.Type.NULL), schema));
+    }
+    return schema;
+  }
+
+  private static Schema.Type inferFieldType(String example) {
+    if (example == null || example.isEmpty()) {
+      return null; // not enough information
+    } else if (LONG.matcher(example).matches()) {
+      return Schema.Type.LONG;
+    } else if (DOUBLE.matcher(example).matches()) {
+      return Schema.Type.DOUBLE;
+    } else if (FLOAT.matcher(example).matches()) {
+      return Schema.Type.FLOAT;
+    }
+    return Schema.Type.STRING;
+  }
+
+  /**
+   * Returns true if the name does not contain characters that are known to be
+   * incompatible with the specs defined in Avro schema.
+   *
+   * @param name a String field name to check
+   * @return will return true if the name is Avro compatible ,false if not
+   */
+  private static boolean isAvroCompatibleName(String name) {
+    return AVRO_COMPATIBLE.matcher(name).matches();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSVReader.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSVReader.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSVReader.java
new file mode 100644
index 0000000..8d5e835
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/AvroCSVReader.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.csv;
+
+import au.com.bytecode.opencsv.CSVReader;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.parquet.cli.util.RuntimeIOException;
+import org.apache.avro.Schema;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class AvroCSVReader<E> implements Iterator<E>, Iterable<E>, Closeable {
+
+  private final boolean reuseRecords;
+  private final CSVReader reader;
+  private final RecordBuilder<E> builder;
+  private boolean hasNext = false;
+  private String[] next = null;
+  private E record = null;
+
+  public AvroCSVReader(InputStream stream, CSVProperties props,
+                       Schema schema, Class<E> type, boolean reuseRecords) {
+    this.reader = AvroCSV.newReader(stream, props);
+    this.reuseRecords = reuseRecords;
+
+    Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
+        "Schemas for CSV files must be records of primitive types");
+
+    List<String> header = null;
+    if (props.useHeader) {
+      this.hasNext = advance();
+      header = Lists.newArrayList(next);
+    } else if (props.header != null) {
+      try {
+        header = Lists.newArrayList(
+            AvroCSV.newParser(props).parseLine(props.header));
+      } catch (IOException e) {
+        throw new RuntimeIOException(
+            "Failed to parse header from properties: " + props.header, e);
+      }
+    }
+
+    this.builder = new RecordBuilder<>(schema, type, header);
+
+    // initialize by reading the first record
+    this.hasNext = advance();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return hasNext;
+  }
+
+  @Override
+  public E next() {
+    if (!hasNext) {
+      throw new NoSuchElementException();
+    }
+
+    try {
+      if (reuseRecords) {
+        this.record = builder.makeRecord(next, record);
+        return record;
+      } else {
+        return builder.makeRecord(next, null);
+      }
+    } finally {
+      this.hasNext = advance();
+    }
+  }
+
+  private boolean advance() {
+    try {
+      next = reader.readNext();
+    } catch (IOException ex) {
+      throw new RuntimeIOException("Could not read record", ex);
+    }
+    return (next != null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      reader.close();
+    } catch (IOException e) {
+      throw new RuntimeIOException("Cannot close reader", e);
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Remove is not implemented.");
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/csv/CSVProperties.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/CSVProperties.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/CSVProperties.java
new file mode 100644
index 0000000..bd4ba06
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/CSVProperties.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.csv;
+
+import javax.annotation.concurrent.Immutable;
+import org.apache.commons.lang.StringEscapeUtils;
+
+@Immutable
+public class CSVProperties {
+
+  public static final String DEFAULT_CHARSET = "utf8";
+  public static final String DEFAULT_DELIMITER = ",";
+  public static final String DEFAULT_QUOTE = "\"";
+  public static final String DEFAULT_ESCAPE = "\\";
+  public static final String DEFAULT_HAS_HEADER = "false";
+  public static final int DEFAULT_LINES_TO_SKIP = 0;
+
+  // configuration
+  public final String charset;
+  public final String delimiter;
+  public final String quote;
+  public final String escape;
+  public final String header;
+  public final boolean useHeader;
+  public final int linesToSkip;
+
+  private CSVProperties(String charset, String delimiter, String quote,
+                        String escape, String header, boolean useHeader,
+                        int linesToSkip) {
+    this.charset = charset;
+    this.delimiter = delimiter;
+    this.quote = quote;
+    this.escape = escape;
+    this.header = header;
+    this.useHeader = useHeader;
+    this.linesToSkip = linesToSkip;
+  }
+
+  public static class Builder {
+    private String charset = DEFAULT_CHARSET;
+    private String delimiter = DEFAULT_DELIMITER;
+    private String quote = DEFAULT_QUOTE;
+    private String escape = DEFAULT_ESCAPE;
+    private boolean useHeader = Boolean.valueOf(DEFAULT_HAS_HEADER);
+    private int linesToSkip = DEFAULT_LINES_TO_SKIP;
+    private String header = null;
+
+    public Builder charset(String charset) {
+      this.charset = charset;
+      return this;
+    }
+
+    public Builder delimiter(String delimiter) {
+      this.delimiter = StringEscapeUtils.unescapeJava(delimiter);
+      return this;
+    }
+
+    public Builder quote(String quote) {
+      this.quote = StringEscapeUtils.unescapeJava(quote);
+      return this;
+    }
+
+    public Builder escape(String escape) {
+      this.escape = StringEscapeUtils.unescapeJava(escape);
+      return this;
+    }
+
+    public Builder header(String header) {
+      this.header = header;
+      return this;
+    }
+
+    public Builder hasHeader() {
+      this.useHeader = true;
+      return this;
+    }
+
+    public Builder hasHeader(boolean hasHeader) {
+      this.useHeader = hasHeader;
+      return this;
+    }
+
+    public Builder linesToSkip(int linesToSkip) {
+      this.linesToSkip = linesToSkip;
+      return this;
+    }
+
+    public CSVProperties build() {
+      return new CSVProperties(
+          charset, delimiter, quote, escape,
+          header, useHeader, linesToSkip);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
new file mode 100644
index 0000000..9adf22e
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/csv/RecordBuilder.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.csv;
+
+import org.apache.parquet.cli.util.RecordException;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.ReflectData;
+import java.util.List;
+
+class RecordBuilder<E> {
+  private final Schema schema;
+  private final Class<E> recordClass;
+  private final Schema.Field[] fields;
+  private final int[] indexes; // Record position to CSV field position
+
+  public RecordBuilder(Schema schema, Class<E> recordClass, List<String> header) {
+    this.schema = schema;
+    this.recordClass = recordClass;
+
+    // initialize the index and field arrays
+    fields = schema.getFields().toArray(new Schema.Field[schema.getFields().size()]);
+    indexes = new int[fields.length];
+
+    if (header != null) {
+      for (int i = 0; i < fields.length; i += 1) {
+        fields[i] = schema.getFields().get(i);
+        indexes[i] = Integer.MAX_VALUE; // never present in the row
+      }
+
+      // there's a header in next
+      for (int i = 0; i < header.size(); i += 1) {
+        Schema.Field field = schema.getField(header.get(i));
+        if (field != null) {
+          indexes[field.pos()] = i;
+        }
+      }
+
+    } else {
+      // without a header, map to fields by position
+      for (int i = 0; i < fields.length; i += 1) {
+        fields[i] = schema.getFields().get(i);
+        indexes[i] = i;
+      }
+    }
+  }
+
+  public E makeRecord(String[] fields, E reuse) {
+    E record = reuse;
+    if (record == null) {
+      record = newRecordInstance();
+    }
+
+    if (record instanceof IndexedRecord) {
+      fillIndexed((IndexedRecord) record, fields);
+    } else {
+      fillReflect(record, fields);
+    }
+
+    return record;
+  }
+
+  @SuppressWarnings("unchecked")
+  private E newRecordInstance() {
+    if (recordClass != GenericData.Record.class && !recordClass.isInterface()) {
+      E record = (E) ReflectData.newInstance(recordClass, schema);
+      if (record != null) {
+        return record;
+      }
+    }
+    return (E) new GenericData.Record(schema);
+  }
+
+  private void fillIndexed(IndexedRecord record, String[] data) {
+    for (int i = 0; i < indexes.length; i += 1) {
+      int index = indexes[i];
+      record.put(i,
+          makeValue(index < data.length ? data[index] : null, fields[i]));
+    }
+  }
+
+  private void fillReflect(Object record, String[] data) {
+    for (int i = 0; i < indexes.length; i += 1) {
+      Schema.Field field = fields[i];
+      int index = indexes[i];
+      Object value = makeValue(index < data.length ? data[index] : null, field);
+      ReflectData.get().setField(record, field.name(), i, value);
+    }
+  }
+
+  private static Object makeValue(String string, Schema.Field field) {
+    try {
+      Object value = makeValue(string, field.schema());
+      if (value != null || Schemas.nullOk(field.schema())) {
+        return value;
+      } else {
+        // this will fail if there is no default value
+        return ReflectData.get().getDefaultValue(field);
+      }
+    } catch (RecordException e) {
+      // add the field name to the error message
+      throw new RecordException(String.format(
+          "Cannot convert field %s", field.name()), e);
+    } catch (NumberFormatException e) {
+      throw new RecordException(String.format(
+          "Field %s: value not a %s: '%s'",
+          field.name(), field.schema(), string), e);
+    } catch (AvroRuntimeException e) {
+      throw new RecordException(String.format(
+          "Field %s: cannot make %s value: '%s'",
+          field.name(), field.schema(), string), e);
+    }
+  }
+
+  /**
+   * Returns a the value as the first matching schema type or null.
+   *
+   * Note that if the value may be null even if the schema does not allow the
+   * value to be null.
+   *
+   * @param string a String representation of the value
+   * @param schema a Schema
+   * @return the string coerced to the correct type from the schema or null
+   */
+  private static Object makeValue(String string, Schema schema) {
+    if (string == null) {
+      return null;
+    }
+
+    try {
+      switch (schema.getType()) {
+        case BOOLEAN:
+          return Boolean.valueOf(string);
+        case STRING:
+          return string;
+        case FLOAT:
+          return Float.valueOf(string);
+        case DOUBLE:
+          return Double.valueOf(string);
+        case INT:
+          return Integer.valueOf(string);
+        case LONG:
+          return Long.valueOf(string);
+        case ENUM:
+          // TODO: translate to enum class
+          if (schema.hasEnumSymbol(string)) {
+            return string;
+          } else {
+            try {
+              return schema.getEnumSymbols().get(Integer.parseInt(string));
+            } catch (IndexOutOfBoundsException ex) {
+              return null;
+            }
+          }
+        case UNION:
+          Object value = null;
+          for (Schema possible : schema.getTypes()) {
+            value = makeValue(string, possible);
+            if (value != null) {
+              return value;
+            }
+          }
+          return null;
+        case NULL:
+          return null;
+        default:
+          // FIXED, BYTES, MAP, ARRAY, RECORD are not supported
+          throw new RecordException(
+              "Unsupported field type:" + schema.getType());
+      }
+    } catch (NumberFormatException e) {
+      // empty string is considered null for numeric types
+      if (string.isEmpty()) {
+        return null;
+      } else {
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
new file mode 100644
index 0000000..f17ee83
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJson.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BinaryNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.MissingNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.NumericNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.parquet.cli.util.RecordException;
+import org.apache.parquet.cli.util.RuntimeIOException;
+import org.apache.parquet.cli.util.Schemas;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class AvroJson {
+
+  private static final JsonFactory FACTORY = new JsonFactory();
+
+  public static Iterator<JsonNode> parser(final InputStream stream) {
+    try {
+      JsonParser parser = FACTORY.createParser(stream);
+      parser.setCodec(new ObjectMapper());
+      return parser.readValuesAs(JsonNode.class);
+    } catch (IOException e) {
+      throw new RuntimeIOException("Cannot read from stream", e);
+    }
+  }
+
+  public static JsonNode parse(String json) {
+    return parse(json, JsonNode.class);
+  }
+
+  public static <T> T parse(String json, Class<T> returnType) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper.readValue(json, returnType);
+    } catch (JsonParseException e) {
+      throw new IllegalArgumentException("Invalid JSON", e);
+    } catch (JsonMappingException e) {
+      throw new IllegalArgumentException("Invalid JSON", e);
+    } catch (IOException e) {
+      throw new RuntimeIOException("Cannot initialize JSON parser", e);
+    }
+  }
+
+  public static JsonNode parse(InputStream json) {
+    return parse(json, JsonNode.class);
+  }
+
+  public static <T> T parse(InputStream json, Class<T> returnType) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      return mapper.readValue(json, returnType);
+    } catch (JsonParseException e) {
+      throw new IllegalArgumentException("Invalid JSON stream", e);
+    } catch (JsonMappingException e) {
+      throw new IllegalArgumentException("Invalid JSON stream", e);
+    } catch (IOException e) {
+      throw new RuntimeIOException("Cannot initialize JSON parser", e);
+    }
+  }
+
+  public static Object convertToAvro(GenericData model, JsonNode datum,
+                                     Schema schema) {
+    if (datum == null) {
+      return null;
+    }
+    switch (schema.getType()) {
+      case RECORD:
+        RecordException.check(datum.isObject(),
+            "Cannot convert non-object to record: %s", datum);
+        Object record = model.newRecord(null, schema);
+        for (Schema.Field field : schema.getFields()) {
+          model.setField(record, field.name(), field.pos(),
+              convertField(model, datum.get(field.name()), field));
+        }
+        return record;
+
+      case MAP:
+        RecordException.check(datum.isObject(),
+            "Cannot convert non-object to map: %s", datum);
+        Map<String, Object> map = Maps.newLinkedHashMap();
+        Iterator<Map.Entry<String, JsonNode>> iter = datum.fields();
+        while (iter.hasNext()) {
+          Map.Entry<String, JsonNode> entry = iter.next();
+          map.put(entry.getKey(), convertToAvro(
+              model, entry.getValue(), schema.getValueType()));
+        }
+        return map;
+
+      case ARRAY:
+        RecordException.check(datum.isArray(),
+            "Cannot convert to array: %s", datum);
+        List<Object> list = Lists.newArrayListWithExpectedSize(datum.size());
+        for (JsonNode element : datum) {
+          list.add(convertToAvro(model, element, schema.getElementType()));
+        }
+        return list;
+
+      case UNION:
+        return convertToAvro(model, datum,
+            resolveUnion(datum, schema.getTypes()));
+
+      case BOOLEAN:
+        RecordException.check(datum.isBoolean(),
+            "Cannot convert to boolean: %s", datum);
+        return datum.booleanValue();
+
+      case FLOAT:
+        RecordException.check(datum.isFloat() || datum.isInt(),
+            "Cannot convert to float: %s", datum);
+        return datum.floatValue();
+
+      case DOUBLE:
+        RecordException.check(
+            datum.isDouble() || datum.isFloat() ||
+            datum.isLong() || datum.isInt(),
+            "Cannot convert to double: %s", datum);
+        return datum.doubleValue();
+
+      case INT:
+        RecordException.check(datum.isInt(),
+            "Cannot convert to int: %s", datum);
+        return datum.intValue();
+
+      case LONG:
+        RecordException.check(datum.isLong() || datum.isInt(),
+            "Cannot convert to long: %s", datum);
+        return datum.longValue();
+
+      case STRING:
+        RecordException.check(datum.isTextual(),
+            "Cannot convert to string: %s", datum);
+        return datum.textValue();
+
+      case ENUM:
+        RecordException.check(datum.isTextual(),
+            "Cannot convert to string: %s", datum);
+        return model.createEnum(datum.textValue(), schema);
+
+      case BYTES:
+        RecordException.check(datum.isBinary(),
+            "Cannot convert to binary: %s", datum);
+        try {
+          return ByteBuffer.wrap(datum.binaryValue());
+        } catch (IOException e) {
+          throw new RecordException("Failed to read JSON binary", e);
+        }
+
+      case FIXED:
+        RecordException.check(datum.isBinary(),
+            "Cannot convert to fixed: %s", datum);
+        byte[] bytes;
+        try {
+          bytes = datum.binaryValue();
+        } catch (IOException e) {
+          throw new RecordException("Failed to read JSON binary", e);
+        }
+        RecordException.check(bytes.length < schema.getFixedSize(),
+            "Binary data is too short: %s bytes for %s", bytes.length, schema);
+        return model.createFixed(null, bytes, schema);
+
+      case NULL:
+        return null;
+
+      default:
+        // don't use DatasetRecordException because this is a Schema problem
+        throw new IllegalArgumentException("Unknown schema type: " + schema);
+    }
+  }
+
+  private static Object convertField(GenericData model, JsonNode datum,
+                                     Schema.Field field) {
+    try {
+      Object value = convertToAvro(model, datum, field.schema());
+      if (value != null || Schemas.nullOk(field.schema())) {
+        return value;
+      } else {
+        return model.getDefaultValue(field);
+      }
+    } catch (RecordException e) {
+      // add the field name to the error message
+      throw new RecordException(String.format(
+          "Cannot convert field %s", field.name()), e);
+    } catch (AvroRuntimeException e) {
+      throw new RecordException(String.format(
+          "Field %s: cannot make %s value: '%s'",
+          field.name(), field.schema(), String.valueOf(datum)), e);
+    }
+  }
+
+  private static Schema resolveUnion(JsonNode datum, Collection<Schema> schemas) {
+    Set<Schema.Type> primitives = Sets.newHashSet();
+    List<Schema> others = Lists.newArrayList();
+    for (Schema schema : schemas) {
+      if (PRIMITIVES.containsKey(schema.getType())) {
+        primitives.add(schema.getType());
+      } else {
+        others.add(schema);
+      }
+    }
+
+    // Try to identify specific primitive types
+    Schema primitiveSchema = null;
+    if (datum == null || datum.isNull()) {
+      primitiveSchema = closestPrimitive(primitives, Schema.Type.NULL);
+    } else if (datum.isShort() || datum.isInt()) {
+      primitiveSchema = closestPrimitive(primitives,
+          Schema.Type.INT, Schema.Type.LONG,
+          Schema.Type.FLOAT, Schema.Type.DOUBLE);
+    } else if (datum.isLong()) {
+      primitiveSchema = closestPrimitive(primitives,
+          Schema.Type.LONG, Schema.Type.DOUBLE);
+    } else if (datum.isFloat()) {
+      primitiveSchema = closestPrimitive(primitives,
+          Schema.Type.FLOAT, Schema.Type.DOUBLE);
+    } else if (datum.isDouble()) {
+      primitiveSchema = closestPrimitive(primitives, Schema.Type.DOUBLE);
+    } else if (datum.isBoolean()) {
+      primitiveSchema = closestPrimitive(primitives, Schema.Type.BOOLEAN);
+    }
+
+    if (primitiveSchema != null) {
+      return primitiveSchema;
+    }
+
+    // otherwise, select the first schema that matches the datum
+    for (Schema schema : others) {
+      if (matches(datum, schema)) {
+        return schema;
+      }
+    }
+
+    throw new RecordException(String.format(
+        "Cannot resolve union: %s not in %s", datum, schemas));
+  }
+
+  // this does not contain string, bytes, or fixed because the datum type
+  // doesn't necessarily determine the schema.
+  private static ImmutableMap<Schema.Type, Schema> PRIMITIVES = ImmutableMap
+      .<Schema.Type, Schema>builder()
+      .put(Schema.Type.NULL, Schema.create(Schema.Type.NULL))
+      .put(Schema.Type.BOOLEAN, Schema.create(Schema.Type.BOOLEAN))
+      .put(Schema.Type.INT, Schema.create(Schema.Type.INT))
+      .put(Schema.Type.LONG, Schema.create(Schema.Type.LONG))
+      .put(Schema.Type.FLOAT, Schema.create(Schema.Type.FLOAT))
+      .put(Schema.Type.DOUBLE, Schema.create(Schema.Type.DOUBLE))
+      .build();
+
+  private static Schema closestPrimitive(Set<Schema.Type> possible, Schema.Type... types) {
+    for (Schema.Type type : types) {
+      if (possible.contains(type) && PRIMITIVES.containsKey(type)) {
+        return PRIMITIVES.get(type);
+      }
+    }
+    return null;
+  }
+
+  private static boolean matches(JsonNode datum, Schema schema) {
+    switch (schema.getType()) {
+      case RECORD:
+        if (datum.isObject()) {
+          // check that each field is present or has a default
+          boolean missingField = false;
+          for (Schema.Field field : schema.getFields()) {
+            if (!datum.has(field.name()) && field.defaultValue() == null) {
+              missingField = true;
+              break;
+            }
+          }
+          if (!missingField) {
+            return true;
+          }
+        }
+        break;
+      case UNION:
+        if (resolveUnion(datum, schema.getTypes()) != null) {
+          return true;
+        }
+        break;
+      case MAP:
+        if (datum.isObject()) {
+          return true;
+        }
+        break;
+      case ARRAY:
+        if (datum.isArray()) {
+          return true;
+        }
+        break;
+      case BOOLEAN:
+        if (datum.isBoolean()) {
+          return true;
+        }
+        break;
+      case FLOAT:
+        if (datum.isFloat() || datum.isInt()) {
+          return true;
+        }
+        break;
+      case DOUBLE:
+        if (datum.isDouble() || datum.isFloat() ||
+            datum.isLong() || datum.isInt()) {
+          return true;
+        }
+        break;
+      case INT:
+        if (datum.isInt()) {
+          return true;
+        }
+        break;
+      case LONG:
+        if (datum.isLong() || datum.isInt()) {
+          return true;
+        }
+        break;
+      case STRING:
+        if (datum.isTextual()) {
+          return true;
+        }
+        break;
+      case ENUM:
+        if (datum.isTextual() && schema.hasEnumSymbol(datum.textValue())) {
+          return true;
+        }
+        break;
+      case BYTES:
+      case FIXED:
+        if (datum.isBinary()) {
+          return true;
+        }
+        break;
+      case NULL:
+        if (datum == null || datum.isNull()) {
+          return true;
+        }
+        break;
+      default: // UNION or unknown
+        throw new IllegalArgumentException("Unsupported schema: " + schema);
+    }
+    return false;
+  }
+
+  public static Schema inferSchema(InputStream incoming, final String name,
+                                   int numRecords) {
+    Iterator<Schema> schemas = Iterators.transform(parser(incoming),
+        new Function<JsonNode, Schema>() {
+          @Override
+          public Schema apply(JsonNode node) {
+            return inferSchema(node, name);
+          }
+        });
+
+    if (!schemas.hasNext()) {
+      return null;
+    }
+
+    Schema result = schemas.next();
+    for (int i = 1; schemas.hasNext() && i < numRecords; i += 1) {
+      result = Schemas.merge(result, schemas.next());
+    }
+
+    return result;
+  }
+
+  public static Schema inferSchema(JsonNode node, String name) {
+    return visit(node, new JsonSchemaVisitor(name));
+  }
+
+  public static Schema inferSchemaWithMaps(JsonNode node, String name) {
+    return visit(node, new JsonSchemaVisitor(name).useMaps());
+  }
+
+  private static class JsonSchemaVisitor extends JsonTreeVisitor<Schema> {
+
+    private static final Joiner DOT = Joiner.on('.');
+    private final String name;
+    private boolean objectsToRecords = true;
+
+    public JsonSchemaVisitor(String name) {
+      this.name = name;
+    }
+
+    public JsonSchemaVisitor useMaps() {
+      this.objectsToRecords = false;
+      return this;
+    }
+
+    @Override
+    public Schema object(ObjectNode object, Map<String, Schema> fields) {
+      if (objectsToRecords || recordLevels.size() < 1) {
+        List<Schema.Field> recordFields = Lists.newArrayListWithExpectedSize(
+            fields.size());
+
+        for (Map.Entry<String, Schema> entry : fields.entrySet()) {
+          recordFields.add(new Schema.Field(
+              entry.getKey(), entry.getValue(),
+              "Type inferred from '" + object.get(entry.getKey()) + "'",
+              null));
+        }
+
+        Schema recordSchema;
+        if (recordLevels.size() < 1) {
+          recordSchema = Schema.createRecord(name, null, null, false);
+        } else {
+          recordSchema = Schema.createRecord(
+              DOT.join(recordLevels), null, null, false);
+        }
+
+        recordSchema.setFields(recordFields);
+
+        return recordSchema;
+
+      } else {
+        // translate to a map; use LinkedHashSet to preserve schema order
+        switch (fields.size()) {
+          case 0:
+            return Schema.createMap(Schema.create(Schema.Type.NULL));
+          case 1:
+            return Schema.createMap(Iterables.getOnlyElement(fields.values()));
+          default:
+            return Schema.createMap(Schemas.mergeOrUnion(fields.values()));
+        }
+      }
+    }
+
+    @Override
+    public Schema array(ArrayNode ignored, List<Schema> elementSchemas) {
+      // use LinkedHashSet to preserve schema order
+      switch (elementSchemas.size()) {
+        case 0:
+          return Schema.createArray(Schema.create(Schema.Type.NULL));
+        case 1:
+          return Schema.createArray(Iterables.getOnlyElement(elementSchemas));
+        default:
+          return Schema.createArray(Schemas.mergeOrUnion(elementSchemas));
+      }
+    }
+
+    @Override
+    public Schema binary(BinaryNode ignored) {
+      return Schema.create(Schema.Type.BYTES);
+    }
+
+    @Override
+    public Schema text(TextNode ignored) {
+      return Schema.create(Schema.Type.STRING);
+    }
+
+    @Override
+    public Schema number(NumericNode number) {
+      if (number.isInt()) {
+        return Schema.create(Schema.Type.INT);
+      } else if (number.isLong()) {
+        return Schema.create(Schema.Type.LONG);
+      } else if (number.isFloat()) {
+        return Schema.create(Schema.Type.FLOAT);
+      } else if (number.isDouble()) {
+        return Schema.create(Schema.Type.DOUBLE);
+      } else {
+        throw new UnsupportedOperationException(
+            number.getClass().getName() + " is not supported");
+      }
+    }
+
+    @Override
+    public Schema bool(BooleanNode ignored) {
+      return Schema.create(Schema.Type.BOOLEAN);
+    }
+
+    @Override
+    public Schema nullNode(NullNode ignored) {
+      return Schema.create(Schema.Type.NULL);
+    }
+
+    @Override
+    public Schema missing(MissingNode ignored) {
+      throw new UnsupportedOperationException("MissingNode is not supported.");
+    }
+  }
+
+  private static <T> T visit(JsonNode node, JsonTreeVisitor<T> visitor) {
+    switch (node.getNodeType()) {
+      case OBJECT:
+        Preconditions.checkArgument(node instanceof ObjectNode,
+            "Expected instance of ObjectNode: " + node);
+
+        // use LinkedHashMap to preserve field order
+        Map<String, T> fields = Maps.newLinkedHashMap();
+
+        Iterator<Map.Entry<String, JsonNode>> iter = node.fields();
+        while (iter.hasNext()) {
+          Map.Entry<String, JsonNode> entry = iter.next();
+
+          visitor.recordLevels.push(entry.getKey());
+          fields.put(entry.getKey(), visit(entry.getValue(), visitor));
+          visitor.recordLevels.pop();
+        }
+
+        return visitor.object((ObjectNode) node, fields);
+
+      case ARRAY:
+        Preconditions.checkArgument(node instanceof ArrayNode,
+            "Expected instance of ArrayNode: " + node);
+
+        List<T> elements = Lists.newArrayListWithExpectedSize(node.size());
+
+        for (JsonNode element : node) {
+          elements.add(visit(element, visitor));
+        }
+
+        return visitor.array((ArrayNode) node, elements);
+
+      case BINARY:
+        Preconditions.checkArgument(node instanceof BinaryNode,
+            "Expected instance of BinaryNode: " + node);
+        return visitor.binary((BinaryNode) node);
+
+      case STRING:
+        Preconditions.checkArgument(node instanceof TextNode,
+            "Expected instance of TextNode: " + node);
+
+        return visitor.text((TextNode) node);
+
+      case NUMBER:
+        Preconditions.checkArgument(node instanceof NumericNode,
+            "Expected instance of NumericNode: " + node);
+
+        return visitor.number((NumericNode) node);
+
+      case BOOLEAN:
+        Preconditions.checkArgument(node instanceof BooleanNode,
+            "Expected instance of BooleanNode: " + node);
+
+        return visitor.bool((BooleanNode) node);
+
+      case MISSING:
+        Preconditions.checkArgument(node instanceof MissingNode,
+            "Expected instance of MissingNode: " + node);
+
+        return visitor.missing((MissingNode) node);
+
+      case NULL:
+        Preconditions.checkArgument(node instanceof NullNode,
+            "Expected instance of NullNode: " + node);
+
+        return visitor.nullNode((NullNode) node);
+
+      default:
+        throw new IllegalArgumentException(
+            "Unknown node type: " + node.getNodeType() + ": " + node);
+    }
+  }
+
+  private abstract static class JsonTreeVisitor<T> {
+    protected LinkedList<String> recordLevels = Lists.newLinkedList();
+
+    public T object(ObjectNode object, Map<String, T> fields) {
+      return null;
+    }
+
+    public T array(ArrayNode array, List<T> elements) {
+      return null;
+    }
+
+    public T binary(BinaryNode binary) {
+      return null;
+    }
+
+    public T text(TextNode text) {
+      return null;
+    }
+
+    public T number(NumericNode number) {
+      return null;
+    }
+
+    public T bool(BooleanNode bool) {
+      return null;
+    }
+
+    public T missing(MissingNode missing) {
+      return null;
+    }
+
+    public T nullNode(NullNode nullNode) {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/ddbeb4dd/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJsonReader.java
----------------------------------------------------------------------
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJsonReader.java b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJsonReader.java
new file mode 100644
index 0000000..a3b067d
--- /dev/null
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/json/AvroJsonReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.json;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.parquet.cli.util.RuntimeIOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class AvroJsonReader<E> implements Iterator<E>, Iterable<E>, Closeable {
+
+  private final GenericData model;
+  private final Schema schema;
+  private final InputStream stream;
+  private Iterator<E> iterator;
+
+  public AvroJsonReader(InputStream stream, Schema schema) {
+    this.stream = stream;
+    this.schema = schema;
+    this.model = GenericData.get();
+    this.iterator = Iterators.transform(AvroJson.parser(stream),
+        new Function<JsonNode, E>() {
+          @Override
+          @SuppressWarnings("unchecked")
+          public E apply(@Nullable JsonNode node) {
+            return (E) AvroJson.convertToAvro(
+                model, node, AvroJsonReader.this.schema);
+          }
+        });
+  }
+
+  @Override
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  public E next() {
+    return iterator.next();
+  }
+
+  @Override
+  public void close() {
+    iterator = null;
+    try {
+      stream.close();
+    } catch (IOException e) {
+      throw new RuntimeIOException("Cannot close reader", e);
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Remove is not implemented.");
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    return this;
+  }
+}