You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:41 UTC

[44/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
new file mode 100644
index 0000000..de350dd
--- /dev/null
+++ b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
@@ -0,0 +1,182 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.flow.Flow;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowConnector;
+import cascading.operation.BaseOperation;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.pipe.Each;
+import cascading.pipe.Pipe;
+import cascading.scheme.Scheme;
+import cascading.scheme.hadoop.TextLine;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Test;
+import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.thrift.test.Name;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestParquetTupleScheme {
+  final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in";
+  final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out";
+
+  @Test
+  public void testReadPattern() throws Exception {
+    String sourceFolder = parquetInputPath;
+    testReadWrite(sourceFolder);
+
+    String sourceGlobPattern = parquetInputPath + "/*";
+    testReadWrite(sourceGlobPattern);
+
+    String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*";
+    testReadWrite(multiLevelGlobPattern);
+  }
+
+  @Test
+  public void testFieldProjection() throws Exception {
+    createFileForRead();
+
+    Path path = new Path(txtOutputPath);
+    final FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name"));
+    Tap source = new Hfs(sourceScheme, parquetInputPath);
+
+    Scheme sinkScheme = new TextLine(new Fields("last_name"));
+    Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+    Pipe assembly = new Pipe("namecp");
+    assembly = new Each(assembly, new ProjectedTupleFunction());
+    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+    String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
+    assertEquals("Practice\nHope\nHorse\n", result);
+  }
+
+  public void testReadWrite(String inputPath) throws Exception {
+    createFileForRead();
+
+    Path path = new Path(txtOutputPath);
+    final FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) fs.delete(path, true);
+
+    Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name"));
+    Tap source = new Hfs(sourceScheme, inputPath);
+
+    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+    Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+    Pipe assembly = new Pipe("namecp");
+    assembly = new Each(assembly, new UnpackTupleFunction());
+    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+    flow.complete();
+    String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
+    assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
+  }
+
+  private void createFileForRead() throws Exception {
+    final Path fileToCreate = new Path(parquetInputPath + "/names.parquet");
+
+    final Configuration conf = new Configuration();
+    final FileSystem fs = fileToCreate.getFileSystem(conf);
+    if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
+
+    TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
+    TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
+    ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
+
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+
+    Name n1 = new Name();
+    n1.setFirst_name("Alice");
+    n1.setLast_name("Practice");
+    Name n2 = new Name();
+    n2.setFirst_name("Bob");
+    n2.setLast_name("Hope");
+    Name n3 = new Name();
+    n3.setFirst_name("Charlie");
+    n3.setLast_name("Horse");
+
+    n1.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    baos.reset();
+    n2.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    baos.reset();
+    n3.write(protocol);
+    w.write(new BytesWritable(baos.toByteArray()));
+    w.close();
+  }
+
+  private static class UnpackTupleFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Tuple name = new Tuple();
+      name.addString(arguments.getString(0));
+      name.addString(arguments.getString(1));
+
+      result.add(name);
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+
+  private static class ProjectedTupleFunction extends BaseOperation implements Function {
+    @Override
+    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+      TupleEntry arguments = functionCall.getArguments();
+      Tuple result = new Tuple();
+
+      Tuple name = new Tuple();
+      name.addString(arguments.getString(0));
+//      name.addString(arguments.getString(1));
+
+      result.add(name);
+      functionCall.getOutputCollector().add(result);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
deleted file mode 100644
index 8e5b96b..0000000
--- a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import cascading.flow.Flow;
-import cascading.flow.FlowProcess;
-import cascading.flow.hadoop.HadoopFlowConnector;
-import cascading.operation.BaseOperation;
-import cascading.operation.Function;
-import cascading.operation.FunctionCall;
-import cascading.pipe.Each;
-import cascading.pipe.Pipe;
-import cascading.scheme.Scheme;
-import cascading.scheme.hadoop.TextLine;
-import cascading.tap.Tap;
-import cascading.tap.hadoop.Hfs;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import parquet.hadoop.thrift.ThriftToParquetFileWriter;
-import parquet.hadoop.util.ContextUtil;
-import parquet.thrift.test.Name;
-
-import java.io.File;
-import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TestParquetTBaseScheme {
-  final String txtInputPath = "src/test/resources/names.txt";
-  final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in";
-  final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out";
-  final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out";
-
-  @Test
-  public void testWrite() throws Exception {
-    Path path = new Path(parquetOutputPath);
-    JobConf jobConf = new JobConf();
-    final FileSystem fs = path.getFileSystem(jobConf);
-    if (fs.exists(path)) fs.delete(path, true);
-
-    Scheme sourceScheme = new TextLine( new Fields( "first", "last" ) );
-    Tap source = new Hfs(sourceScheme, txtInputPath);
-
-    Scheme sinkScheme = new ParquetTBaseScheme(Name.class);
-    Tap sink = new Hfs(sinkScheme, parquetOutputPath);
-
-    Pipe assembly = new Pipe( "namecp" );
-    assembly = new Each(assembly, new PackThriftFunction());
-    HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector();
-    Flow flow  = hadoopFlowConnector.connect("namecp", source, sink, assembly);
-
-    flow.complete();
-
-    assertTrue(fs.exists(new Path(parquetOutputPath)));
-    assertTrue(fs.exists(new Path(parquetOutputPath + "/_metadata")));
-    assertTrue(fs.exists(new Path(parquetOutputPath + "/_common_metadata")));
-  }
-
-  @Test
-  public void testRead() throws Exception {
-    doRead(new ParquetTBaseScheme(Name.class));
-  }
-
-  @Test
-  public void testReadWithoutClass() throws Exception {
-    doRead(new ParquetTBaseScheme());
-  }
-
-  private void doRead(Scheme sourceScheme) throws Exception {
-    createFileForRead();
-
-    Path path = new Path(txtOutputPath);
-    final FileSystem fs = path.getFileSystem(new Configuration());
-    if (fs.exists(path)) fs.delete(path, true);
-
-    Tap source = new Hfs(sourceScheme, parquetInputPath);
-
-    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
-    Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
-    Pipe assembly = new Pipe( "namecp" );
-    assembly = new Each(assembly, new UnpackThriftFunction());
-    Flow flow  = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
-    flow.complete();
-    String result = FileUtils.readFileToString(new File(txtOutputPath+"/part-00000"));
-    assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
-  }
-
-
-  private void createFileForRead() throws Exception {
-    final Path fileToCreate = new Path(parquetInputPath+"/names.parquet");
-
-    final Configuration conf = new Configuration();
-    final FileSystem fs = fileToCreate.getFileSystem(conf);
-    if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
-
-    TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
-    TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
-    ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
-
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
-    Name n1 = new Name();
-    n1.setFirst_name("Alice");
-    n1.setLast_name("Practice");
-    Name n2 = new Name();
-    n2.setFirst_name("Bob");
-    n2.setLast_name("Hope");
-    Name n3 = new Name();
-    n3.setFirst_name("Charlie");
-    n3.setLast_name("Horse");
-
-    n1.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    baos.reset();
-    n2.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    baos.reset();
-    n3.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    w.close();
-  }
-
-  private static class PackThriftFunction extends BaseOperation implements Function {
-    @Override
-    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
-      TupleEntry arguments = functionCall.getArguments();
-      Tuple result = new Tuple();
-
-      Name name = new Name();
-      name.setFirst_name(arguments.getString(0));
-      name.setLast_name(arguments.getString(1));
-
-      result.add(name);
-      functionCall.getOutputCollector().add(result);
-    }
-  }
-
-  private static class UnpackThriftFunction extends BaseOperation implements Function {
-    @Override
-    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
-      TupleEntry arguments = functionCall.getArguments();
-      Tuple result = new Tuple();
-
-      Name name = (Name) arguments.get(0);
-      result.add(name.getFirst_name());
-      result.add(name.getLast_name());
-      functionCall.getOutputCollector().add(result);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/java/parquet/cascading/TestParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTupleScheme.java
deleted file mode 100644
index 564beaf..0000000
--- a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTupleScheme.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import cascading.flow.Flow;
-import cascading.flow.FlowProcess;
-import cascading.flow.hadoop.HadoopFlowConnector;
-import cascading.operation.BaseOperation;
-import cascading.operation.Function;
-import cascading.operation.FunctionCall;
-import cascading.pipe.Each;
-import cascading.pipe.Pipe;
-import cascading.scheme.Scheme;
-import cascading.scheme.hadoop.TextLine;
-import cascading.tap.Tap;
-import cascading.tap.hadoop.Hfs;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.junit.Test;
-import parquet.hadoop.thrift.ThriftToParquetFileWriter;
-import parquet.hadoop.util.ContextUtil;
-import parquet.thrift.test.Name;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestParquetTupleScheme {
-  final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in";
-  final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out";
-
-  @Test
-  public void testReadPattern() throws Exception {
-    String sourceFolder = parquetInputPath;
-    testReadWrite(sourceFolder);
-
-    String sourceGlobPattern = parquetInputPath + "/*";
-    testReadWrite(sourceGlobPattern);
-
-    String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*";
-    testReadWrite(multiLevelGlobPattern);
-  }
-
-  @Test
-  public void testFieldProjection() throws Exception {
-    createFileForRead();
-
-    Path path = new Path(txtOutputPath);
-    final FileSystem fs = path.getFileSystem(new Configuration());
-    if (fs.exists(path)) fs.delete(path, true);
-
-    Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name"));
-    Tap source = new Hfs(sourceScheme, parquetInputPath);
-
-    Scheme sinkScheme = new TextLine(new Fields("last_name"));
-    Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
-    Pipe assembly = new Pipe("namecp");
-    assembly = new Each(assembly, new ProjectedTupleFunction());
-    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
-    flow.complete();
-    String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
-    assertEquals("Practice\nHope\nHorse\n", result);
-  }
-
-  public void testReadWrite(String inputPath) throws Exception {
-    createFileForRead();
-
-    Path path = new Path(txtOutputPath);
-    final FileSystem fs = path.getFileSystem(new Configuration());
-    if (fs.exists(path)) fs.delete(path, true);
-
-    Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name"));
-    Tap source = new Hfs(sourceScheme, inputPath);
-
-    Scheme sinkScheme = new TextLine(new Fields("first", "last"));
-    Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
-    Pipe assembly = new Pipe("namecp");
-    assembly = new Each(assembly, new UnpackTupleFunction());
-    Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
-    flow.complete();
-    String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
-    assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
-  }
-
-  private void createFileForRead() throws Exception {
-    final Path fileToCreate = new Path(parquetInputPath + "/names.parquet");
-
-    final Configuration conf = new Configuration();
-    final FileSystem fs = fileToCreate.getFileSystem(conf);
-    if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
-
-    TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
-    TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
-    ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
-
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
-    Name n1 = new Name();
-    n1.setFirst_name("Alice");
-    n1.setLast_name("Practice");
-    Name n2 = new Name();
-    n2.setFirst_name("Bob");
-    n2.setLast_name("Hope");
-    Name n3 = new Name();
-    n3.setFirst_name("Charlie");
-    n3.setLast_name("Horse");
-
-    n1.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    baos.reset();
-    n2.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    baos.reset();
-    n3.write(protocol);
-    w.write(new BytesWritable(baos.toByteArray()));
-    w.close();
-  }
-
-  private static class UnpackTupleFunction extends BaseOperation implements Function {
-    @Override
-    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
-      TupleEntry arguments = functionCall.getArguments();
-      Tuple result = new Tuple();
-
-      Tuple name = new Tuple();
-      name.addString(arguments.getString(0));
-      name.addString(arguments.getString(1));
-
-      result.add(name);
-      functionCall.getOutputCollector().add(result);
-    }
-  }
-
-  private static class ProjectedTupleFunction extends BaseOperation implements Function {
-    @Override
-    public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
-      TupleEntry arguments = functionCall.getArguments();
-      Tuple result = new Tuple();
-
-      Tuple name = new Tuple();
-      name.addString(arguments.getString(0));
-//      name.addString(arguments.getString(1));
-
-      result.add(name);
-      functionCall.getOutputCollector().add(result);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/thrift/test.thrift
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/thrift/test.thrift b/parquet-cascading/src/test/thrift/test.thrift
index 640e124..c58843d 100644
--- a/parquet-cascading/src/test/thrift/test.thrift
+++ b/parquet-cascading/src/test/thrift/test.thrift
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-namespace java parquet.thrift.test
+namespace java org.apache.parquet.thrift.test
 
 struct Name {
   1: required string first_name,

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index b8fe029..e50f909 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -18,7 +18,7 @@
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
-    <groupId>com.twitter</groupId>
+    <groupId>org.apache.parquet</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
     <version>1.7.0-incubating-SNAPSHOT</version>
@@ -37,12 +37,12 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.twitter</groupId>
+      <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-common</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.twitter</groupId>
+      <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-encoding</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -53,7 +53,7 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
-      <groupId>com.twitter</groupId>
+      <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-encoding</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>
@@ -107,7 +107,7 @@
               <relocations>
                 <relocation>
                   <pattern>it.unimi.dsi</pattern>
-                  <shadedPattern>parquet.it.unimi.dsi</shadedPattern>
+                  <shadedPattern>org.apache.parquet.it.unimi.dsi</shadedPattern>
                 </relocation>
               </relocations>
             </configuration>
@@ -127,7 +127,7 @@
           </execution>
         </executions>
         <configuration>
-          <mainClass>parquet.filter2.Generator</mainClass>          
+          <mainClass>org.apache.parquet.filter2.Generator</mainClass>
           <arguments>
             <argument>${basedir}/target/generated-src</argument>
           </arguments>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
new file mode 100644
index 0000000..5a44116
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
@@ -0,0 +1,129 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import java.util.Arrays;
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Describes a column's type as well as its position in its containing schema.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ColumnDescriptor implements Comparable<ColumnDescriptor> {
+
+  private final String[] path;
+  private final PrimitiveTypeName type;
+  private final int typeLength;
+  private final int maxRep;
+  private final int maxDef;
+
+  /**
+   *
+   * @param path the path to the leaf field in the schema
+   * @param type the type of the field
+   * @param maxRep the maximum repetition level for that path
+   * @param maxDef the maximum definition level for that path
+   */
+  public ColumnDescriptor(String[] path, PrimitiveTypeName type, int maxRep, 
+                          int maxDef) {
+    this(path, type, 0, maxRep, maxDef);
+  }
+
+  /**
+   *
+   * @param path the path to the leaf field in the schema
+   * @param type the type of the field
+   * @param maxRep the maximum repetition level for that path
+   * @param maxDef the maximum definition level for that path
+   */
+  public ColumnDescriptor(String[] path, PrimitiveTypeName type, 
+                          int typeLength, int maxRep, int maxDef) {
+    super();
+    this.path = path;
+    this.type = type;
+    this.typeLength = typeLength;
+    this.maxRep = maxRep;
+    this.maxDef = maxDef;
+  }
+
+  /**
+   * @return the path to the leaf field in the schema
+   */
+  public String[] getPath() {
+    return path;
+  }
+
+  /**
+   * @return the maximum repetition level for that path
+   */
+  public int getMaxRepetitionLevel() {
+    return maxRep;
+  }
+
+  /**
+   * @return  the maximum definition level for that path
+   */
+  public int getMaxDefinitionLevel() {
+    return maxDef;
+  }
+
+  /**
+   * @return the type of that column
+   */
+  public PrimitiveTypeName getType() {
+    return type;
+  }
+
+  /**
+   * @return the size of the type
+   **/
+  public int getTypeLength() {
+    return typeLength;
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(path);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return Arrays.equals(path, ((ColumnDescriptor)obj).path);
+  }
+
+  @Override
+  public int compareTo(ColumnDescriptor o) {
+    // TODO(julien): this will fail if o.path.length < this.path.length
+    for (int i = 0; i < path.length; i++) {
+      int compareTo = path[i].compareTo(o.path[i]);
+      if (compareTo != 0) {
+        return compareTo;
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(path) + " " + type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReadStore.java
new file mode 100644
index 0000000..813666a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReadStore.java
@@ -0,0 +1,34 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+/**
+ * Container which can produce a ColumnReader for any given column in a schema.
+ *
+ * @author Julien Le Dem
+ */
+public interface ColumnReadStore {
+
+  /**
+   * @param path the column to read
+   * @return the column reader for that descriptor
+   */
+  abstract public ColumnReader getColumnReader(ColumnDescriptor path);
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
new file mode 100644
index 0000000..f802c92
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
@@ -0,0 +1,115 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Reader for (repetition level, definition level, values) triplets.
+ * At any given point in time, a ColumnReader points to a single (r, d, v) triplet.
+ * In order to move to the next triplet, call {@link #consume()}.
+ *
+ * Depending on the type and the encoding of the column only a subset of the get* methods are implemented.
+ * Dictionary specific methods enable the upper layers to read the dictionary IDs without decoding the data.
+ * In particular the Converter will decode the strings in the dictionary only once and iterate on the
+ * dictionary IDs instead of the values.
+ *
+ * <ul>Each iteration looks at the current definition level and value as well as the next
+ * repetition level:
+ *  <li> The current definition level defines if the value is null.</li>
+ *  <li> If the value is defined we can read it with the correct get*() method.</li>
+ *  <li> Looking ahead to the next repetition determines what is the next column to read for in the FSA.</li>
+ * </ul>
+ * @author Julien Le Dem
+  */
+public interface ColumnReader {
+
+  /**
+   * @return the totalCount of values to be consumed
+   */
+  long getTotalValueCount();
+
+  /**
+   * Consume the current triplet, moving to the next value.
+   */
+  void consume();
+
+  /**
+   * must return 0 when isFullyConsumed() == true
+   * @return the repetition level for the current value
+   */
+  int getCurrentRepetitionLevel();
+
+  /**
+   * @return the definition level for the current value
+   */
+  int getCurrentDefinitionLevel();
+
+  /**
+   * writes the current value to the converter
+   */
+  void writeCurrentValueToConverter();
+
+  /**
+   * Skip the current value
+   */
+  void skip();
+
+  /**
+   * available when the underlying encoding is dictionary based
+   * @return the dictionary id for the current value
+   */
+  int getCurrentValueDictionaryID();
+
+  /**
+   * @return the current value
+   */
+  int getInteger();
+
+  /**
+   * @return the current value
+   */
+  boolean getBoolean();
+
+  /**
+   * @return the current value
+   */
+  long getLong();
+
+  /**
+   * @return the current value
+   */
+  Binary getBinary();
+
+  /**
+   * @return the current value
+   */
+  float getFloat();
+
+  /**
+   * @return the current value
+   */
+  double getDouble();
+
+  /**
+   * @return Descriptor of the column.
+   */
+  ColumnDescriptor getDescriptor();
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
new file mode 100644
index 0000000..739c00f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
@@ -0,0 +1,61 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+/**
+ * Container which can construct writers for multiple columns to be stored
+ * together.
+ *
+ * @author Julien Le Dem
+ */
+public interface ColumnWriteStore {
+  /**
+   * @param path the column for which to create a writer
+   * @return the column writer for the given column
+   */
+  abstract public ColumnWriter getColumnWriter(ColumnDescriptor path);
+
+  /**
+   * when we are done writing to flush to the underlying storage
+   */
+  abstract public void flush();
+
+  /**
+   * called to notify of record boundaries
+   */
+  abstract public void endRecord();
+
+  /**
+   * used for information
+   * @return approximate size used in memory
+   */
+  abstract public long getAllocatedSize();
+
+  /**
+   * used to flush row groups to disk
+   * @return approximate size of the buffered encoded binary data
+   */
+  abstract public long getBufferedSize();
+
+  /**
+   * used for debugging pupose
+   * @return a formated string representing memory usage per column
+   */
+  abstract public String memUsageString();
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
new file mode 100644
index 0000000..7605c50
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
@@ -0,0 +1,87 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * writer for (repetition level, definition level, values) triplets
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface ColumnWriter {
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  void write(int value, int repetitionLevel, int definitionLevel);
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  void write(long value, int repetitionLevel, int definitionLevel);
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  void write(boolean value, int repetitionLevel, int definitionLevel);
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  void write(Binary value, int repetitionLevel, int definitionLevel);
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  void write(float value, int repetitionLevel, int definitionLevel);
+
+  /**
+   * writes the current value
+   * @param value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  void write(double value, int repetitionLevel, int definitionLevel);
+
+  /**
+   * writes the current null value
+   * @param repetitionLevel
+   * @param definitionLevel
+   */
+  void writeNull(int repetitionLevel, int definitionLevel);
+
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/Dictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Dictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/Dictionary.java
new file mode 100644
index 0000000..45fd42d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/Dictionary.java
@@ -0,0 +1,66 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * a dictionary to decode dictionary based encodings
+ *
+ * @author Julien Le Dem
+ *
+ */
+public abstract class Dictionary {
+
+  private final Encoding encoding;
+
+  public Dictionary(Encoding encoding) {
+    this.encoding = encoding;
+  }
+
+  public Encoding getEncoding() {
+    return encoding;
+  }
+
+  public abstract int getMaxId();
+
+  public Binary decodeToBinary(int id) {
+    throw new UnsupportedOperationException(this.getClass().getName());
+  }
+
+  public int decodeToInt(int id) {
+    throw new UnsupportedOperationException(this.getClass().getName());
+  }
+
+  public long decodeToLong(int id) {
+    throw new UnsupportedOperationException(this.getClass().getName());
+  }
+
+  public float decodeToFloat(int id) {
+    throw new UnsupportedOperationException(this.getClass().getName());
+  }
+
+  public double decodeToDouble(int id) {
+    throw new UnsupportedOperationException(this.getClass().getName());
+  }
+
+  public boolean decodeToBoolean(int id) {
+    throw new UnsupportedOperationException(this.getClass().getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
new file mode 100644
index 0000000..9770044
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
@@ -0,0 +1,291 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+
+import java.io.IOException;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
+import org.apache.parquet.column.values.boundedint.ZeroIntegerValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainBinaryDictionary;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainDoubleDictionary;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainFloatDictionary;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainIntegerDictionary;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainLongDictionary;
+import org.apache.parquet.column.values.plain.BinaryPlainValuesReader;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesReader;
+import org.apache.parquet.column.values.plain.BooleanPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.IntegerPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.LongPlainValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * encoding of the data
+ *
+ * @author Julien Le Dem
+ *
+ */
+public enum Encoding {
+
+  PLAIN {
+    @Override
+    public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+      switch (descriptor.getType()) {
+      case BOOLEAN:
+        return new BooleanPlainValuesReader();
+      case BINARY:
+        return new BinaryPlainValuesReader();
+      case FLOAT:
+        return new FloatPlainValuesReader();
+      case DOUBLE:
+        return new DoublePlainValuesReader();
+      case INT32:
+        return new IntegerPlainValuesReader();
+      case INT64:
+        return new LongPlainValuesReader();
+      case INT96:
+        return new FixedLenByteArrayPlainValuesReader(12);
+      case FIXED_LEN_BYTE_ARRAY:
+        return new FixedLenByteArrayPlainValuesReader(descriptor.getTypeLength());
+      default:
+        throw new ParquetDecodingException("no plain reader for type " + descriptor.getType());
+      }
+    }
+
+    @Override
+    public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
+      switch (descriptor.getType()) {
+      case BINARY:
+        return new PlainBinaryDictionary(dictionaryPage);
+      case FIXED_LEN_BYTE_ARRAY:
+        return new PlainBinaryDictionary(dictionaryPage, descriptor.getTypeLength());
+      case INT96:
+        return new PlainBinaryDictionary(dictionaryPage, 12);
+      case INT64:
+        return new PlainLongDictionary(dictionaryPage);
+      case DOUBLE:
+        return new PlainDoubleDictionary(dictionaryPage);
+      case INT32:
+        return new PlainIntegerDictionary(dictionaryPage);
+      case FLOAT:
+        return new PlainFloatDictionary(dictionaryPage);
+      default:
+        throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
+      }
+
+    }
+  },
+
+  /**
+   * Actually a combination of bit packing and run length encoding.
+   * TODO: Should we rename this to be more clear?
+   */
+  RLE {
+    @Override
+    public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+      int bitWidth = BytesUtils.getWidthFromMaxInt(getMaxLevel(descriptor, valuesType));
+      if(bitWidth == 0) {
+        return new ZeroIntegerValuesReader();
+      }
+      return new RunLengthBitPackingHybridValuesReader(bitWidth);
+    }
+  },
+
+  /**
+   * @deprecated This is no longer used, and has been replaced by {@link #RLE}
+   * which is combination of bit packing and rle
+   */
+  @Deprecated
+  BIT_PACKED {
+    @Override
+    public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+      return new ByteBitPackingValuesReader(getMaxLevel(descriptor, valuesType), BIG_ENDIAN);
+    }
+  },
+
+  /**
+   * @deprecated now replaced by RLE_DICTIONARY for the data page encoding and PLAIN for the dictionary page encoding
+   */
+  @Deprecated
+  PLAIN_DICTIONARY {
+    @Override
+    public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
+      return RLE_DICTIONARY.getDictionaryBasedValuesReader(descriptor, valuesType, dictionary);
+    }
+
+    @Override
+    public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
+      return PLAIN.initDictionary(descriptor, dictionaryPage);
+    }
+
+    @Override
+    public boolean usesDictionary() {
+      return true;
+    }
+
+  },
+
+  /**
+   * Delta encoding for integers. This can be used for int columns and works best
+   * on sorted data
+   */
+  DELTA_BINARY_PACKED {
+    @Override
+    public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+      if(descriptor.getType() != INT32) {
+        throw new ParquetDecodingException("Encoding DELTA_BINARY_PACKED is only supported for type INT32");
+      }
+      return new DeltaBinaryPackingValuesReader();
+    }
+  },
+
+  /**
+   * Encoding for byte arrays to separate the length values and the data. The lengths
+   * are encoded using DELTA_BINARY_PACKED
+   */
+  DELTA_LENGTH_BYTE_ARRAY {
+    @Override
+    public ValuesReader getValuesReader(ColumnDescriptor descriptor,
+        ValuesType valuesType) {
+      if (descriptor.getType() != BINARY) {
+        throw new ParquetDecodingException("Encoding DELTA_LENGTH_BYTE_ARRAY is only supported for type BINARY");
+      }
+      return new DeltaLengthByteArrayValuesReader();
+    }
+  },
+
+  /**
+   * Incremental-encoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED.
+   * Suffixes are stored as delta length byte arrays.
+   */
+  DELTA_BYTE_ARRAY {
+    @Override
+    public ValuesReader getValuesReader(ColumnDescriptor descriptor,
+        ValuesType valuesType) {
+      if (descriptor.getType() != BINARY) {
+        throw new ParquetDecodingException("Encoding DELTA_BYTE_ARRAY is only supported for type BINARY");
+      }
+      return new DeltaByteArrayReader();
+    }
+  },
+
+  /**
+   * Dictionary encoding: the ids are encoded using the RLE encoding
+   */
+  RLE_DICTIONARY {
+
+    @Override
+    public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
+      switch (descriptor.getType()) {
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+      case INT96:
+      case INT64:
+      case DOUBLE:
+      case INT32:
+      case FLOAT:
+        return new DictionaryValuesReader(dictionary);
+      default:
+        throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
+      }
+    }
+
+    @Override
+    public boolean usesDictionary() {
+      return true;
+    }
+
+  };
+
+  int getMaxLevel(ColumnDescriptor descriptor, ValuesType valuesType) {
+    int maxLevel;
+    switch (valuesType) {
+    case REPETITION_LEVEL:
+      maxLevel = descriptor.getMaxRepetitionLevel();
+      break;
+    case DEFINITION_LEVEL:
+      maxLevel = descriptor.getMaxDefinitionLevel();
+      break;
+    case VALUES:
+      if(descriptor.getType() == BOOLEAN) {
+        maxLevel = 1;
+        break;
+      }
+    default:
+      throw new ParquetDecodingException("Unsupported encoding for values: " + this);
+    }
+    return maxLevel;
+  }
+
+  /**
+   * @return whether this encoding requires a dictionary
+   */
+  public boolean usesDictionary() {
+    return false;
+  }
+
+  /**
+   * initializes a dictionary from a page
+   * @param dictionaryPage
+   * @return the corresponding dictionary
+   */
+  public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
+    throw new UnsupportedOperationException(this.name() + " does not support dictionary");
+  }
+
+  /**
+   * To read decoded values that don't require a dictionary
+   *
+   * @param descriptor the column to read
+   * @param valuesType the type of values
+   * @return the proper values reader for the given column
+   * @throw {@link UnsupportedOperationException} if the encoding is dictionary based
+   */
+  public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+    throw new UnsupportedOperationException("Error decoding " + descriptor + ". " + this.name() + " is dictionary based");
+  }
+
+  /**
+   * To read decoded values that require a dictionary
+   *
+   * @param descriptor the column to read
+   * @param valuesType the type of values
+   * @param dictionary the dictionary
+   * @return the proper values reader for the given column
+   * @throw {@link UnsupportedOperationException} if the encoding is not dictionary based
+   */
+  public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
+    throw new UnsupportedOperationException(this.name() + " is not dictionary based");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
new file mode 100644
index 0000000..df44c4b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -0,0 +1,242 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.impl.ColumnWriteStoreV2;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.boundedint.DevNullValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * This class represents all the configurable Parquet properties.
+ *
+ * @author amokashi
+ *
+ */
+public class ParquetProperties {
+
+  public enum WriterVersion {
+    PARQUET_1_0 ("v1"),
+    PARQUET_2_0 ("v2");
+
+    private final String shortName;
+
+    WriterVersion(String shortname) {
+      this.shortName = shortname;
+    }
+
+    public static WriterVersion fromString(String name) {
+      for (WriterVersion v : WriterVersion.values()) {
+        if (v.shortName.equals(name)) {
+          return v;
+        }
+      }
+      // Throws IllegalArgumentException if name does not exact match with enum name
+      return WriterVersion.valueOf(name);
+    }
+  }
+  private final int dictionaryPageSizeThreshold;
+  private final WriterVersion writerVersion;
+  private final boolean enableDictionary;
+
+  public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict) {
+    this.dictionaryPageSizeThreshold = dictPageSize;
+    this.writerVersion = writerVersion;
+    this.enableDictionary = enableDict;
+  }
+
+  public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
+    if (maxLevel == 0) {
+      return new DevNullValuesWriter();
+    } else {
+      return new RunLengthBitPackingHybridValuesWriter(
+          getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
+    }
+  }
+
+  private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+    switch (path.getType()) {
+    case BOOLEAN:
+      return new BooleanPlainValuesWriter();
+    case INT96:
+      return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
+    case FIXED_LEN_BYTE_ARRAY:
+      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
+    case BINARY:
+    case INT32:
+    case INT64:
+    case DOUBLE:
+    case FLOAT:
+      return new PlainValuesWriter(initialSizePerCol, pageSize);
+    default:
+      throw new IllegalArgumentException("Unknown type " + path.getType());
+    }
+  }
+
+  private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initialSizePerCol) {
+    Encoding encodingForDataPage;
+    Encoding encodingForDictionaryPage;
+    switch(writerVersion) {
+    case PARQUET_1_0:
+      encodingForDataPage = PLAIN_DICTIONARY;
+      encodingForDictionaryPage = PLAIN_DICTIONARY;
+      break;
+    case PARQUET_2_0:
+      encodingForDataPage = RLE_DICTIONARY;
+      encodingForDictionaryPage = PLAIN;
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown version: " + writerVersion);
+    }
+    switch (path.getType()) {
+    case BOOLEAN:
+      throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
+    case BINARY:
+      return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+    case INT32:
+      return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+    case INT64:
+      return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+    case INT96:
+      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage);
+    case DOUBLE:
+      return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+    case FLOAT:
+      return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+    case FIXED_LEN_BYTE_ARRAY:
+      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage);
+    default:
+      throw new IllegalArgumentException("Unknown type " + path.getType());
+    }
+  }
+
+  private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+    switch(writerVersion) {
+    case PARQUET_1_0:
+      return plainWriter(path, initialSizePerCol, pageSize);
+    case PARQUET_2_0:
+      switch (path.getType()) {
+      case BOOLEAN:
+        return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+        return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
+      case INT32:
+        return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
+      case INT96:
+      case INT64:
+      case DOUBLE:
+      case FLOAT:
+        return plainWriter(path, initialSizePerCol, pageSize);
+      default:
+        throw new IllegalArgumentException("Unknown type " + path.getType());
+      }
+    default:
+      throw new IllegalArgumentException("Unknown version: " + writerVersion);
+    }
+  }
+
+  private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+    ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
+    if (enableDictionary) {
+      return FallbackValuesWriter.of(
+          dictionaryWriter(path, initialSizePerCol),
+          writerToFallBackTo);
+    } else {
+     return writerToFallBackTo;
+    }
+  }
+
+  public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+    switch (path.getType()) {
+    case BOOLEAN: // no dictionary encoding for boolean
+      return writerToFallbackTo(path, initialSizePerCol, pageSize);
+    case FIXED_LEN_BYTE_ARRAY:
+      // dictionary encoding for that type was not enabled in PARQUET 1.0
+      if (writerVersion == WriterVersion.PARQUET_2_0) {
+        return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
+      } else {
+       return writerToFallbackTo(path, initialSizePerCol, pageSize);
+      }
+    case BINARY:
+    case INT32:
+    case INT64:
+    case INT96:
+    case DOUBLE:
+    case FLOAT:
+      return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
+    default:
+      throw new IllegalArgumentException("Unknown type " + path.getType());
+    }
+  }
+
+  public int getDictionaryPageSizeThreshold() {
+    return dictionaryPageSizeThreshold;
+  }
+
+  public WriterVersion getWriterVersion() {
+    return writerVersion;
+  }
+
+  public boolean isEnableDictionary() {
+    return enableDictionary;
+  }
+
+  public ColumnWriteStore newColumnWriteStore(
+      MessageType schema,
+      PageWriteStore pageStore,
+      int pageSize) {
+    switch (writerVersion) {
+    case PARQUET_1_0:
+      return new ColumnWriteStoreV1(
+          pageStore,
+          pageSize,
+          dictionaryPageSizeThreshold,
+          enableDictionary, writerVersion);
+    case PARQUET_2_0:
+      return new ColumnWriteStoreV2(
+          schema,
+          pageStore,
+          pageSize,
+          new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
+    default:
+      throw new IllegalArgumentException("unknown version " + writerVersion);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnException.java b/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnException.java
new file mode 100644
index 0000000..5c05447
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnException.java
@@ -0,0 +1,42 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * Thrown if the specified column is unknown in the underlying storage
+ *
+ * @author Julien Le Dem
+ */
+public class UnknownColumnException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  private final ColumnDescriptor descriptor;
+
+  public UnknownColumnException(ColumnDescriptor descriptor) {
+    super("Column not found: " + descriptor.toString());
+    this.descriptor = descriptor;
+  }
+
+  public ColumnDescriptor getDescriptor() {
+    return descriptor;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnTypeException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnTypeException.java b/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnTypeException.java
new file mode 100644
index 0000000..126bc48
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnTypeException.java
@@ -0,0 +1,43 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Thrown if the specified column type is unknown in the underlying storage
+ *
+ * @author  Katya Gonina
+ */
+public class UnknownColumnTypeException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  private final PrimitiveTypeName type;
+
+  public UnknownColumnTypeException(PrimitiveTypeName type) {
+    super("Column type not found: " + type.toString());
+    this.type= type;
+  }
+
+  public PrimitiveTypeName getType() {
+    return this.type;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ValuesType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ValuesType.java b/parquet-column/src/main/java/org/apache/parquet/column/ValuesType.java
new file mode 100644
index 0000000..89cf55b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ValuesType.java
@@ -0,0 +1,29 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+/**
+ * The different type of values we can store in columns
+ *
+ * @author Julien Le Dem
+ *
+ */
+public enum ValuesType {
+  REPETITION_LEVEL, DEFINITION_LEVEL, VALUES;
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
new file mode 100644
index 0000000..bfbcdb9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -0,0 +1,82 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReadStore;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+/**
+ * Implementation of the ColumnReadStore
+ *
+ * Initializes individual columns based on schema and converter
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ColumnReadStoreImpl implements ColumnReadStore {
+
+  private final PageReadStore pageReadStore;
+  private final GroupConverter recordConverter;
+  private final MessageType schema;
+
+  /**
+   * @param pageReadStore uderlying page storage
+   * @param recordConverter the user provided converter to materialize records
+   * @param schema the schema we are reading
+   */
+  public ColumnReadStoreImpl(PageReadStore pageReadStore, GroupConverter recordConverter, MessageType schema) {
+    super();
+    this.pageReadStore = pageReadStore;
+    this.recordConverter = recordConverter;
+    this.schema = schema;
+  }
+
+  @Override
+  public ColumnReader getColumnReader(ColumnDescriptor path) {
+    return newMemColumnReader(path, pageReadStore.getPageReader(path));
+  }
+
+  private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
+    PrimitiveConverter converter = getPrimitiveConverter(path);
+    return new ColumnReaderImpl(path, pageReader, converter);
+  }
+
+  private PrimitiveConverter getPrimitiveConverter(ColumnDescriptor path) {
+    Type currentType = schema;
+    Converter currentConverter = recordConverter;
+    for (String fieldName : path.getPath()) {
+      final GroupType groupType = currentType.asGroupType();
+      int fieldIndex = groupType.getFieldIndex(fieldName);
+      currentType = groupType.getType(fieldName);
+      currentConverter = currentConverter.asGroupConverter().getConverter(fieldIndex);
+    }
+    PrimitiveConverter converter = currentConverter.asPrimitiveConverter();
+    return converter;
+  }
+
+}