You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:41 UTC
[44/51] [partial] parquet-mr git commit: PARQUET-23: Rename to
org.apache.parquet.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
new file mode 100644
index 0000000..de350dd
--- /dev/null
+++ b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.flow.Flow;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowConnector;
+import cascading.operation.BaseOperation;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.pipe.Each;
+import cascading.pipe.Pipe;
+import cascading.scheme.Scheme;
+import cascading.scheme.hadoop.TextLine;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Test;
+import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.thrift.test.Name;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestParquetTupleScheme {
+ final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in";
+ final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out";
+
+ @Test
+ public void testReadPattern() throws Exception {
+ String sourceFolder = parquetInputPath;
+ testReadWrite(sourceFolder);
+
+ String sourceGlobPattern = parquetInputPath + "/*";
+ testReadWrite(sourceGlobPattern);
+
+ String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*";
+ testReadWrite(multiLevelGlobPattern);
+ }
+
+ @Test
+ public void testFieldProjection() throws Exception {
+ createFileForRead();
+
+ Path path = new Path(txtOutputPath);
+ final FileSystem fs = path.getFileSystem(new Configuration());
+ if (fs.exists(path)) fs.delete(path, true);
+
+ Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name"));
+ Tap source = new Hfs(sourceScheme, parquetInputPath);
+
+ Scheme sinkScheme = new TextLine(new Fields("last_name"));
+ Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+ Pipe assembly = new Pipe("namecp");
+ assembly = new Each(assembly, new ProjectedTupleFunction());
+ Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+ flow.complete();
+ String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
+ assertEquals("Practice\nHope\nHorse\n", result);
+ }
+
+ public void testReadWrite(String inputPath) throws Exception {
+ createFileForRead();
+
+ Path path = new Path(txtOutputPath);
+ final FileSystem fs = path.getFileSystem(new Configuration());
+ if (fs.exists(path)) fs.delete(path, true);
+
+ Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name"));
+ Tap source = new Hfs(sourceScheme, inputPath);
+
+ Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+ Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+ Pipe assembly = new Pipe("namecp");
+ assembly = new Each(assembly, new UnpackTupleFunction());
+ Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+ flow.complete();
+ String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
+ assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
+ }
+
+ private void createFileForRead() throws Exception {
+ final Path fileToCreate = new Path(parquetInputPath + "/names.parquet");
+
+ final Configuration conf = new Configuration();
+ final FileSystem fs = fileToCreate.getFileSystem(conf);
+ if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
+
+ TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
+ TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
+ ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+
+ Name n1 = new Name();
+ n1.setFirst_name("Alice");
+ n1.setLast_name("Practice");
+ Name n2 = new Name();
+ n2.setFirst_name("Bob");
+ n2.setLast_name("Hope");
+ Name n3 = new Name();
+ n3.setFirst_name("Charlie");
+ n3.setLast_name("Horse");
+
+ n1.write(protocol);
+ w.write(new BytesWritable(baos.toByteArray()));
+ baos.reset();
+ n2.write(protocol);
+ w.write(new BytesWritable(baos.toByteArray()));
+ baos.reset();
+ n3.write(protocol);
+ w.write(new BytesWritable(baos.toByteArray()));
+ w.close();
+ }
+
+ private static class UnpackTupleFunction extends BaseOperation implements Function {
+ @Override
+ public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+ TupleEntry arguments = functionCall.getArguments();
+ Tuple result = new Tuple();
+
+ Tuple name = new Tuple();
+ name.addString(arguments.getString(0));
+ name.addString(arguments.getString(1));
+
+ result.add(name);
+ functionCall.getOutputCollector().add(result);
+ }
+ }
+
+ private static class ProjectedTupleFunction extends BaseOperation implements Function {
+ @Override
+ public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+ TupleEntry arguments = functionCall.getArguments();
+ Tuple result = new Tuple();
+
+ Tuple name = new Tuple();
+ name.addString(arguments.getString(0));
+// name.addString(arguments.getString(1));
+
+ result.add(name);
+ functionCall.getOutputCollector().add(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
deleted file mode 100644
index 8e5b96b..0000000
--- a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import cascading.flow.Flow;
-import cascading.flow.FlowProcess;
-import cascading.flow.hadoop.HadoopFlowConnector;
-import cascading.operation.BaseOperation;
-import cascading.operation.Function;
-import cascading.operation.FunctionCall;
-import cascading.pipe.Each;
-import cascading.pipe.Pipe;
-import cascading.scheme.Scheme;
-import cascading.scheme.hadoop.TextLine;
-import cascading.tap.Tap;
-import cascading.tap.hadoop.Hfs;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import parquet.hadoop.thrift.ThriftToParquetFileWriter;
-import parquet.hadoop.util.ContextUtil;
-import parquet.thrift.test.Name;
-
-import java.io.File;
-import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TestParquetTBaseScheme {
- final String txtInputPath = "src/test/resources/names.txt";
- final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in";
- final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out";
- final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out";
-
- @Test
- public void testWrite() throws Exception {
- Path path = new Path(parquetOutputPath);
- JobConf jobConf = new JobConf();
- final FileSystem fs = path.getFileSystem(jobConf);
- if (fs.exists(path)) fs.delete(path, true);
-
- Scheme sourceScheme = new TextLine( new Fields( "first", "last" ) );
- Tap source = new Hfs(sourceScheme, txtInputPath);
-
- Scheme sinkScheme = new ParquetTBaseScheme(Name.class);
- Tap sink = new Hfs(sinkScheme, parquetOutputPath);
-
- Pipe assembly = new Pipe( "namecp" );
- assembly = new Each(assembly, new PackThriftFunction());
- HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector();
- Flow flow = hadoopFlowConnector.connect("namecp", source, sink, assembly);
-
- flow.complete();
-
- assertTrue(fs.exists(new Path(parquetOutputPath)));
- assertTrue(fs.exists(new Path(parquetOutputPath + "/_metadata")));
- assertTrue(fs.exists(new Path(parquetOutputPath + "/_common_metadata")));
- }
-
- @Test
- public void testRead() throws Exception {
- doRead(new ParquetTBaseScheme(Name.class));
- }
-
- @Test
- public void testReadWithoutClass() throws Exception {
- doRead(new ParquetTBaseScheme());
- }
-
- private void doRead(Scheme sourceScheme) throws Exception {
- createFileForRead();
-
- Path path = new Path(txtOutputPath);
- final FileSystem fs = path.getFileSystem(new Configuration());
- if (fs.exists(path)) fs.delete(path, true);
-
- Tap source = new Hfs(sourceScheme, parquetInputPath);
-
- Scheme sinkScheme = new TextLine(new Fields("first", "last"));
- Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
- Pipe assembly = new Pipe( "namecp" );
- assembly = new Each(assembly, new UnpackThriftFunction());
- Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
- flow.complete();
- String result = FileUtils.readFileToString(new File(txtOutputPath+"/part-00000"));
- assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
- }
-
-
- private void createFileForRead() throws Exception {
- final Path fileToCreate = new Path(parquetInputPath+"/names.parquet");
-
- final Configuration conf = new Configuration();
- final FileSystem fs = fileToCreate.getFileSystem(conf);
- if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
-
- TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
- TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
- ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
- Name n1 = new Name();
- n1.setFirst_name("Alice");
- n1.setLast_name("Practice");
- Name n2 = new Name();
- n2.setFirst_name("Bob");
- n2.setLast_name("Hope");
- Name n3 = new Name();
- n3.setFirst_name("Charlie");
- n3.setLast_name("Horse");
-
- n1.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- baos.reset();
- n2.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- baos.reset();
- n3.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- w.close();
- }
-
- private static class PackThriftFunction extends BaseOperation implements Function {
- @Override
- public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
- TupleEntry arguments = functionCall.getArguments();
- Tuple result = new Tuple();
-
- Name name = new Name();
- name.setFirst_name(arguments.getString(0));
- name.setLast_name(arguments.getString(1));
-
- result.add(name);
- functionCall.getOutputCollector().add(result);
- }
- }
-
- private static class UnpackThriftFunction extends BaseOperation implements Function {
- @Override
- public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
- TupleEntry arguments = functionCall.getArguments();
- Tuple result = new Tuple();
-
- Name name = (Name) arguments.get(0);
- result.add(name.getFirst_name());
- result.add(name.getLast_name());
- functionCall.getOutputCollector().add(result);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/java/parquet/cascading/TestParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTupleScheme.java
deleted file mode 100644
index 564beaf..0000000
--- a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTupleScheme.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.cascading;
-
-import cascading.flow.Flow;
-import cascading.flow.FlowProcess;
-import cascading.flow.hadoop.HadoopFlowConnector;
-import cascading.operation.BaseOperation;
-import cascading.operation.Function;
-import cascading.operation.FunctionCall;
-import cascading.pipe.Each;
-import cascading.pipe.Pipe;
-import cascading.scheme.Scheme;
-import cascading.scheme.hadoop.TextLine;
-import cascading.tap.Tap;
-import cascading.tap.hadoop.Hfs;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.junit.Test;
-import parquet.hadoop.thrift.ThriftToParquetFileWriter;
-import parquet.hadoop.util.ContextUtil;
-import parquet.thrift.test.Name;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestParquetTupleScheme {
- final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in";
- final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out";
-
- @Test
- public void testReadPattern() throws Exception {
- String sourceFolder = parquetInputPath;
- testReadWrite(sourceFolder);
-
- String sourceGlobPattern = parquetInputPath + "/*";
- testReadWrite(sourceGlobPattern);
-
- String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*";
- testReadWrite(multiLevelGlobPattern);
- }
-
- @Test
- public void testFieldProjection() throws Exception {
- createFileForRead();
-
- Path path = new Path(txtOutputPath);
- final FileSystem fs = path.getFileSystem(new Configuration());
- if (fs.exists(path)) fs.delete(path, true);
-
- Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name"));
- Tap source = new Hfs(sourceScheme, parquetInputPath);
-
- Scheme sinkScheme = new TextLine(new Fields("last_name"));
- Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
- Pipe assembly = new Pipe("namecp");
- assembly = new Each(assembly, new ProjectedTupleFunction());
- Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
- flow.complete();
- String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
- assertEquals("Practice\nHope\nHorse\n", result);
- }
-
- public void testReadWrite(String inputPath) throws Exception {
- createFileForRead();
-
- Path path = new Path(txtOutputPath);
- final FileSystem fs = path.getFileSystem(new Configuration());
- if (fs.exists(path)) fs.delete(path, true);
-
- Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name"));
- Tap source = new Hfs(sourceScheme, inputPath);
-
- Scheme sinkScheme = new TextLine(new Fields("first", "last"));
- Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
- Pipe assembly = new Pipe("namecp");
- assembly = new Each(assembly, new UnpackTupleFunction());
- Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
- flow.complete();
- String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
- assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
- }
-
- private void createFileForRead() throws Exception {
- final Path fileToCreate = new Path(parquetInputPath + "/names.parquet");
-
- final Configuration conf = new Configuration();
- final FileSystem fs = fileToCreate.getFileSystem(conf);
- if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
-
- TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
- TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
- ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
- Name n1 = new Name();
- n1.setFirst_name("Alice");
- n1.setLast_name("Practice");
- Name n2 = new Name();
- n2.setFirst_name("Bob");
- n2.setLast_name("Hope");
- Name n3 = new Name();
- n3.setFirst_name("Charlie");
- n3.setLast_name("Horse");
-
- n1.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- baos.reset();
- n2.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- baos.reset();
- n3.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- w.close();
- }
-
- private static class UnpackTupleFunction extends BaseOperation implements Function {
- @Override
- public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
- TupleEntry arguments = functionCall.getArguments();
- Tuple result = new Tuple();
-
- Tuple name = new Tuple();
- name.addString(arguments.getString(0));
- name.addString(arguments.getString(1));
-
- result.add(name);
- functionCall.getOutputCollector().add(result);
- }
- }
-
- private static class ProjectedTupleFunction extends BaseOperation implements Function {
- @Override
- public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
- TupleEntry arguments = functionCall.getArguments();
- Tuple result = new Tuple();
-
- Tuple name = new Tuple();
- name.addString(arguments.getString(0));
-// name.addString(arguments.getString(1));
-
- result.add(name);
- functionCall.getOutputCollector().add(result);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-cascading/src/test/thrift/test.thrift
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/thrift/test.thrift b/parquet-cascading/src/test/thrift/test.thrift
index 640e124..c58843d 100644
--- a/parquet-cascading/src/test/thrift/test.thrift
+++ b/parquet-cascading/src/test/thrift/test.thrift
@@ -17,7 +17,7 @@
* under the License.
*/
-namespace java parquet.thrift.test
+namespace java org.apache.parquet.thrift.test
struct Name {
1: required string first_name,
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index b8fe029..e50f909 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -18,7 +18,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>com.twitter</groupId>
+ <groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.7.0-incubating-SNAPSHOT</version>
@@ -37,12 +37,12 @@
<dependencies>
<dependency>
- <groupId>com.twitter</groupId>
+ <groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>com.twitter</groupId>
+ <groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>${project.version}</version>
</dependency>
@@ -53,7 +53,7 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>com.twitter</groupId>
+ <groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
@@ -107,7 +107,7 @@
<relocations>
<relocation>
<pattern>it.unimi.dsi</pattern>
- <shadedPattern>parquet.it.unimi.dsi</shadedPattern>
+ <shadedPattern>org.apache.parquet.it.unimi.dsi</shadedPattern>
</relocation>
</relocations>
</configuration>
@@ -127,7 +127,7 @@
</execution>
</executions>
<configuration>
- <mainClass>parquet.filter2.Generator</mainClass>
+ <mainClass>org.apache.parquet.filter2.Generator</mainClass>
<arguments>
<argument>${basedir}/target/generated-src</argument>
</arguments>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
new file mode 100644
index 0000000..5a44116
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import java.util.Arrays;
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Describes a column's type as well as its position in its containing schema.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ColumnDescriptor implements Comparable<ColumnDescriptor> {
+
+ private final String[] path;
+ private final PrimitiveTypeName type;
+ private final int typeLength;
+ private final int maxRep;
+ private final int maxDef;
+
+ /**
+ *
+ * @param path the path to the leaf field in the schema
+ * @param type the type of the field
+ * @param maxRep the maximum repetition level for that path
+ * @param maxDef the maximum definition level for that path
+ */
+ public ColumnDescriptor(String[] path, PrimitiveTypeName type, int maxRep,
+ int maxDef) {
+ this(path, type, 0, maxRep, maxDef);
+ }
+
+ /**
+ *
+ * @param path the path to the leaf field in the schema
+ * @param type the type of the field
+ * @param maxRep the maximum repetition level for that path
+ * @param maxDef the maximum definition level for that path
+ */
+ public ColumnDescriptor(String[] path, PrimitiveTypeName type,
+ int typeLength, int maxRep, int maxDef) {
+ super();
+ this.path = path;
+ this.type = type;
+ this.typeLength = typeLength;
+ this.maxRep = maxRep;
+ this.maxDef = maxDef;
+ }
+
+ /**
+ * @return the path to the leaf field in the schema
+ */
+ public String[] getPath() {
+ return path;
+ }
+
+ /**
+ * @return the maximum repetition level for that path
+ */
+ public int getMaxRepetitionLevel() {
+ return maxRep;
+ }
+
+ /**
+ * @return the maximum definition level for that path
+ */
+ public int getMaxDefinitionLevel() {
+ return maxDef;
+ }
+
+ /**
+ * @return the type of that column
+ */
+ public PrimitiveTypeName getType() {
+ return type;
+ }
+
+ /**
+ * @return the size of the type
+ **/
+ public int getTypeLength() {
+ return typeLength;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(path);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return Arrays.equals(path, ((ColumnDescriptor)obj).path);
+ }
+
+ @Override
+ public int compareTo(ColumnDescriptor o) {
+ // TODO(julien): this will fail if o.path.length < this.path.length
+ for (int i = 0; i < path.length; i++) {
+ int compareTo = path[i].compareTo(o.path[i]);
+ if (compareTo != 0) {
+ return compareTo;
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(path) + " " + type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReadStore.java
new file mode 100644
index 0000000..813666a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReadStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+/**
+ * Container which can produce a ColumnReader for any given column in a schema.
+ *
+ * @author Julien Le Dem
+ */
+public interface ColumnReadStore {
+
+ /**
+ * @param path the column to read
+ * @return the column reader for that descriptor
+ */
+ abstract public ColumnReader getColumnReader(ColumnDescriptor path);
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
new file mode 100644
index 0000000..f802c92
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnReader.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Reader for (repetition level, definition level, values) triplets.
+ * At any given point in time, a ColumnReader points to a single (r, d, v) triplet.
+ * In order to move to the next triplet, call {@link #consume()}.
+ *
+ * Depending on the type and the encoding of the column only a subset of the get* methods are implemented.
+ * Dictionary specific methods enable the upper layers to read the dictionary IDs without decoding the data.
+ * In particular the Converter will decode the strings in the dictionary only once and iterate on the
+ * dictionary IDs instead of the values.
+ *
+ * <ul>Each iteration looks at the current definition level and value as well as the next
+ * repetition level:
+ * <li> The current definition level defines if the value is null.</li>
+ * <li> If the value is defined we can read it with the correct get*() method.</li>
+ * <li> Looking ahead to the next repetition determines what is the next column to read for in the FSA.</li>
+ * </ul>
+ * @author Julien Le Dem
+ */
+public interface ColumnReader {
+
+ /**
+ * @return the totalCount of values to be consumed
+ */
+ long getTotalValueCount();
+
+ /**
+ * Consume the current triplet, moving to the next value.
+ */
+ void consume();
+
+ /**
+ * must return 0 when isFullyConsumed() == true
+ * @return the repetition level for the current value
+ */
+ int getCurrentRepetitionLevel();
+
+ /**
+ * @return the definition level for the current value
+ */
+ int getCurrentDefinitionLevel();
+
+ /**
+ * writes the current value to the converter
+ */
+ void writeCurrentValueToConverter();
+
+ /**
+ * Skip the current value
+ */
+ void skip();
+
+ /**
+ * available when the underlying encoding is dictionary based
+ * @return the dictionary id for the current value
+ */
+ int getCurrentValueDictionaryID();
+
+ /**
+ * @return the current value
+ */
+ int getInteger();
+
+ /**
+ * @return the current value
+ */
+ boolean getBoolean();
+
+ /**
+ * @return the current value
+ */
+ long getLong();
+
+ /**
+ * @return the current value
+ */
+ Binary getBinary();
+
+ /**
+ * @return the current value
+ */
+ float getFloat();
+
+ /**
+ * @return the current value
+ */
+ double getDouble();
+
+ /**
+ * @return Descriptor of the column.
+ */
+ ColumnDescriptor getDescriptor();
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
new file mode 100644
index 0000000..739c00f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+/**
+ * Container which can construct writers for multiple columns to be stored
+ * together.
+ *
+ * @author Julien Le Dem
+ */
+public interface ColumnWriteStore {
+ /**
+ * @param path the column for which to create a writer
+ * @return the column writer for the given column
+ */
+ abstract public ColumnWriter getColumnWriter(ColumnDescriptor path);
+
+ /**
+ * when we are done writing to flush to the underlying storage
+ */
+ abstract public void flush();
+
+ /**
+ * called to notify of record boundaries
+ */
+ abstract public void endRecord();
+
+ /**
+ * used for information
+ * @return approximate size used in memory
+ */
+ abstract public long getAllocatedSize();
+
+ /**
+ * used to flush row groups to disk
+ * @return approximate size of the buffered encoded binary data
+ */
+ abstract public long getBufferedSize();
+
+ /**
+ * used for debugging pupose
+ * @return a formated string representing memory usage per column
+ */
+ abstract public String memUsageString();
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
new file mode 100644
index 0000000..7605c50
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * writer for (repetition level, definition level, values) triplets
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface ColumnWriter {
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ void write(int value, int repetitionLevel, int definitionLevel);
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ void write(long value, int repetitionLevel, int definitionLevel);
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ void write(boolean value, int repetitionLevel, int definitionLevel);
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ void write(Binary value, int repetitionLevel, int definitionLevel);
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ void write(float value, int repetitionLevel, int definitionLevel);
+
+ /**
+ * writes the current value
+ * @param value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ void write(double value, int repetitionLevel, int definitionLevel);
+
+ /**
+ * writes the current null value
+ * @param repetitionLevel
+ * @param definitionLevel
+ */
+ void writeNull(int repetitionLevel, int definitionLevel);
+
+}
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/Dictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Dictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/Dictionary.java
new file mode 100644
index 0000000..45fd42d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/Dictionary.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * a dictionary to decode dictionary based encodings
+ *
+ * @author Julien Le Dem
+ *
+ */
+public abstract class Dictionary {
+
+ private final Encoding encoding;
+
+ public Dictionary(Encoding encoding) {
+ this.encoding = encoding;
+ }
+
+ public Encoding getEncoding() {
+ return encoding;
+ }
+
+ public abstract int getMaxId();
+
+ public Binary decodeToBinary(int id) {
+ throw new UnsupportedOperationException(this.getClass().getName());
+ }
+
+ public int decodeToInt(int id) {
+ throw new UnsupportedOperationException(this.getClass().getName());
+ }
+
+ public long decodeToLong(int id) {
+ throw new UnsupportedOperationException(this.getClass().getName());
+ }
+
+ public float decodeToFloat(int id) {
+ throw new UnsupportedOperationException(this.getClass().getName());
+ }
+
+ public double decodeToDouble(int id) {
+ throw new UnsupportedOperationException(this.getClass().getName());
+ }
+
+ public boolean decodeToBoolean(int id) {
+ throw new UnsupportedOperationException(this.getClass().getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
new file mode 100644
index 0000000..9770044
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+
+import java.io.IOException;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
+import org.apache.parquet.column.values.boundedint.ZeroIntegerValuesReader;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainBinaryDictionary;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainDoubleDictionary;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainFloatDictionary;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainIntegerDictionary;
+import org.apache.parquet.column.values.dictionary.PlainValuesDictionary.PlainLongDictionary;
+import org.apache.parquet.column.values.plain.BinaryPlainValuesReader;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesReader;
+import org.apache.parquet.column.values.plain.BooleanPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.FloatPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.IntegerPlainValuesReader;
+import org.apache.parquet.column.values.plain.PlainValuesReader.LongPlainValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * encoding of the data
+ *
+ * @author Julien Le Dem
+ *
+ */
+public enum Encoding {
+
+ PLAIN {
+ @Override
+ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+ switch (descriptor.getType()) {
+ case BOOLEAN:
+ return new BooleanPlainValuesReader();
+ case BINARY:
+ return new BinaryPlainValuesReader();
+ case FLOAT:
+ return new FloatPlainValuesReader();
+ case DOUBLE:
+ return new DoublePlainValuesReader();
+ case INT32:
+ return new IntegerPlainValuesReader();
+ case INT64:
+ return new LongPlainValuesReader();
+ case INT96:
+ return new FixedLenByteArrayPlainValuesReader(12);
+ case FIXED_LEN_BYTE_ARRAY:
+ return new FixedLenByteArrayPlainValuesReader(descriptor.getTypeLength());
+ default:
+ throw new ParquetDecodingException("no plain reader for type " + descriptor.getType());
+ }
+ }
+
+ @Override
+ public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
+ switch (descriptor.getType()) {
+ case BINARY:
+ return new PlainBinaryDictionary(dictionaryPage);
+ case FIXED_LEN_BYTE_ARRAY:
+ return new PlainBinaryDictionary(dictionaryPage, descriptor.getTypeLength());
+ case INT96:
+ return new PlainBinaryDictionary(dictionaryPage, 12);
+ case INT64:
+ return new PlainLongDictionary(dictionaryPage);
+ case DOUBLE:
+ return new PlainDoubleDictionary(dictionaryPage);
+ case INT32:
+ return new PlainIntegerDictionary(dictionaryPage);
+ case FLOAT:
+ return new PlainFloatDictionary(dictionaryPage);
+ default:
+ throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
+ }
+
+ }
+ },
+
+ /**
+ * Actually a combination of bit packing and run length encoding.
+ * TODO: Should we rename this to be more clear?
+ */
+ RLE {
+ @Override
+ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+ int bitWidth = BytesUtils.getWidthFromMaxInt(getMaxLevel(descriptor, valuesType));
+ if(bitWidth == 0) {
+ return new ZeroIntegerValuesReader();
+ }
+ return new RunLengthBitPackingHybridValuesReader(bitWidth);
+ }
+ },
+
+ /**
+ * @deprecated This is no longer used, and has been replaced by {@link #RLE}
+ * which is combination of bit packing and rle
+ */
+ @Deprecated
+ BIT_PACKED {
+ @Override
+ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+ return new ByteBitPackingValuesReader(getMaxLevel(descriptor, valuesType), BIG_ENDIAN);
+ }
+ },
+
+ /**
+ * @deprecated now replaced by RLE_DICTIONARY for the data page encoding and PLAIN for the dictionary page encoding
+ */
+ @Deprecated
+ PLAIN_DICTIONARY {
+ @Override
+ public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
+ return RLE_DICTIONARY.getDictionaryBasedValuesReader(descriptor, valuesType, dictionary);
+ }
+
+ @Override
+ public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
+ return PLAIN.initDictionary(descriptor, dictionaryPage);
+ }
+
+ @Override
+ public boolean usesDictionary() {
+ return true;
+ }
+
+ },
+
+ /**
+ * Delta encoding for integers. This can be used for int columns and works best
+ * on sorted data
+ */
+ DELTA_BINARY_PACKED {
+ @Override
+ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+ if(descriptor.getType() != INT32) {
+ throw new ParquetDecodingException("Encoding DELTA_BINARY_PACKED is only supported for type INT32");
+ }
+ return new DeltaBinaryPackingValuesReader();
+ }
+ },
+
+ /**
+ * Encoding for byte arrays to separate the length values and the data. The lengths
+ * are encoded using DELTA_BINARY_PACKED
+ */
+ DELTA_LENGTH_BYTE_ARRAY {
+ @Override
+ public ValuesReader getValuesReader(ColumnDescriptor descriptor,
+ ValuesType valuesType) {
+ if (descriptor.getType() != BINARY) {
+ throw new ParquetDecodingException("Encoding DELTA_LENGTH_BYTE_ARRAY is only supported for type BINARY");
+ }
+ return new DeltaLengthByteArrayValuesReader();
+ }
+ },
+
+ /**
+ * Incremental-encoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED.
+ * Suffixes are stored as delta length byte arrays.
+ */
+ DELTA_BYTE_ARRAY {
+ @Override
+ public ValuesReader getValuesReader(ColumnDescriptor descriptor,
+ ValuesType valuesType) {
+ if (descriptor.getType() != BINARY) {
+ throw new ParquetDecodingException("Encoding DELTA_BYTE_ARRAY is only supported for type BINARY");
+ }
+ return new DeltaByteArrayReader();
+ }
+ },
+
+ /**
+ * Dictionary encoding: the ids are encoded using the RLE encoding
+ */
+ RLE_DICTIONARY {
+
+ @Override
+ public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
+ switch (descriptor.getType()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ case INT64:
+ case DOUBLE:
+ case INT32:
+ case FLOAT:
+ return new DictionaryValuesReader(dictionary);
+ default:
+ throw new ParquetDecodingException("Dictionary encoding not supported for type: " + descriptor.getType());
+ }
+ }
+
+ @Override
+ public boolean usesDictionary() {
+ return true;
+ }
+
+ };
+
+ int getMaxLevel(ColumnDescriptor descriptor, ValuesType valuesType) {
+ int maxLevel;
+ switch (valuesType) {
+ case REPETITION_LEVEL:
+ maxLevel = descriptor.getMaxRepetitionLevel();
+ break;
+ case DEFINITION_LEVEL:
+ maxLevel = descriptor.getMaxDefinitionLevel();
+ break;
+ case VALUES:
+ if(descriptor.getType() == BOOLEAN) {
+ maxLevel = 1;
+ break;
+ }
+ default:
+ throw new ParquetDecodingException("Unsupported encoding for values: " + this);
+ }
+ return maxLevel;
+ }
+
+ /**
+ * @return whether this encoding requires a dictionary
+ */
+ public boolean usesDictionary() {
+ return false;
+ }
+
+ /**
+ * initializes a dictionary from a page
+ * @param dictionaryPage
+ * @return the corresponding dictionary
+ */
+ public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage) throws IOException {
+ throw new UnsupportedOperationException(this.name() + " does not support dictionary");
+ }
+
+ /**
+ * To read decoded values that don't require a dictionary
+ *
+ * @param descriptor the column to read
+ * @param valuesType the type of values
+ * @return the proper values reader for the given column
+ * @throw {@link UnsupportedOperationException} if the encoding is dictionary based
+ */
+ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
+ throw new UnsupportedOperationException("Error decoding " + descriptor + ". " + this.name() + " is dictionary based");
+ }
+
+ /**
+ * To read decoded values that require a dictionary
+ *
+ * @param descriptor the column to read
+ * @param valuesType the type of values
+ * @param dictionary the dictionary
+ * @return the proper values reader for the given column
+ * @throw {@link UnsupportedOperationException} if the encoding is not dictionary based
+ */
+ public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary) {
+ throw new UnsupportedOperationException(this.name() + " is not dictionary based");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
new file mode 100644
index 0000000..df44c4b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.impl.ColumnWriteStoreV2;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.boundedint.DevNullValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
+import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * This class represents all the configurable Parquet properties.
+ *
+ * @author amokashi
+ *
+ */
+public class ParquetProperties {
+
+ public enum WriterVersion {
+ PARQUET_1_0 ("v1"),
+ PARQUET_2_0 ("v2");
+
+ private final String shortName;
+
+ WriterVersion(String shortname) {
+ this.shortName = shortname;
+ }
+
+ public static WriterVersion fromString(String name) {
+ for (WriterVersion v : WriterVersion.values()) {
+ if (v.shortName.equals(name)) {
+ return v;
+ }
+ }
+ // Throws IllegalArgumentException if name does not exact match with enum name
+ return WriterVersion.valueOf(name);
+ }
+ }
+ private final int dictionaryPageSizeThreshold;
+ private final WriterVersion writerVersion;
+ private final boolean enableDictionary;
+
+ public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict) {
+ this.dictionaryPageSizeThreshold = dictPageSize;
+ this.writerVersion = writerVersion;
+ this.enableDictionary = enableDict;
+ }
+
+ public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
+ if (maxLevel == 0) {
+ return new DevNullValuesWriter();
+ } else {
+ return new RunLengthBitPackingHybridValuesWriter(
+ getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
+ }
+ }
+
+ private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+ switch (path.getType()) {
+ case BOOLEAN:
+ return new BooleanPlainValuesWriter();
+ case INT96:
+ return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
+ case FIXED_LEN_BYTE_ARRAY:
+ return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
+ case BINARY:
+ case INT32:
+ case INT64:
+ case DOUBLE:
+ case FLOAT:
+ return new PlainValuesWriter(initialSizePerCol, pageSize);
+ default:
+ throw new IllegalArgumentException("Unknown type " + path.getType());
+ }
+ }
+
+ private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initialSizePerCol) {
+ Encoding encodingForDataPage;
+ Encoding encodingForDictionaryPage;
+ switch(writerVersion) {
+ case PARQUET_1_0:
+ encodingForDataPage = PLAIN_DICTIONARY;
+ encodingForDictionaryPage = PLAIN_DICTIONARY;
+ break;
+ case PARQUET_2_0:
+ encodingForDataPage = RLE_DICTIONARY;
+ encodingForDictionaryPage = PLAIN;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown version: " + writerVersion);
+ }
+ switch (path.getType()) {
+ case BOOLEAN:
+ throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
+ case BINARY:
+ return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+ case INT32:
+ return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+ case INT64:
+ return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+ case INT96:
+ return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage);
+ case DOUBLE:
+ return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+ case FLOAT:
+ return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+ case FIXED_LEN_BYTE_ARRAY:
+ return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage);
+ default:
+ throw new IllegalArgumentException("Unknown type " + path.getType());
+ }
+ }
+
+ private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+ switch(writerVersion) {
+ case PARQUET_1_0:
+ return plainWriter(path, initialSizePerCol, pageSize);
+ case PARQUET_2_0:
+ switch (path.getType()) {
+ case BOOLEAN:
+ return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
+ case INT32:
+ return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
+ case INT96:
+ case INT64:
+ case DOUBLE:
+ case FLOAT:
+ return plainWriter(path, initialSizePerCol, pageSize);
+ default:
+ throw new IllegalArgumentException("Unknown type " + path.getType());
+ }
+ default:
+ throw new IllegalArgumentException("Unknown version: " + writerVersion);
+ }
+ }
+
+ private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+ ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
+ if (enableDictionary) {
+ return FallbackValuesWriter.of(
+ dictionaryWriter(path, initialSizePerCol),
+ writerToFallBackTo);
+ } else {
+ return writerToFallBackTo;
+ }
+ }
+
+ public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+ switch (path.getType()) {
+ case BOOLEAN: // no dictionary encoding for boolean
+ return writerToFallbackTo(path, initialSizePerCol, pageSize);
+ case FIXED_LEN_BYTE_ARRAY:
+ // dictionary encoding for that type was not enabled in PARQUET 1.0
+ if (writerVersion == WriterVersion.PARQUET_2_0) {
+ return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
+ } else {
+ return writerToFallbackTo(path, initialSizePerCol, pageSize);
+ }
+ case BINARY:
+ case INT32:
+ case INT64:
+ case INT96:
+ case DOUBLE:
+ case FLOAT:
+ return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
+ default:
+ throw new IllegalArgumentException("Unknown type " + path.getType());
+ }
+ }
+
+ public int getDictionaryPageSizeThreshold() {
+ return dictionaryPageSizeThreshold;
+ }
+
+ public WriterVersion getWriterVersion() {
+ return writerVersion;
+ }
+
+ public boolean isEnableDictionary() {
+ return enableDictionary;
+ }
+
+ public ColumnWriteStore newColumnWriteStore(
+ MessageType schema,
+ PageWriteStore pageStore,
+ int pageSize) {
+ switch (writerVersion) {
+ case PARQUET_1_0:
+ return new ColumnWriteStoreV1(
+ pageStore,
+ pageSize,
+ dictionaryPageSizeThreshold,
+ enableDictionary, writerVersion);
+ case PARQUET_2_0:
+ return new ColumnWriteStoreV2(
+ schema,
+ pageStore,
+ pageSize,
+ new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
+ default:
+ throw new IllegalArgumentException("unknown version " + writerVersion);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnException.java b/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnException.java
new file mode 100644
index 0000000..5c05447
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * Thrown if the specified column is unknown in the underlying storage
+ *
+ * @author Julien Le Dem
+ */
+public class UnknownColumnException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ private final ColumnDescriptor descriptor;
+
+ public UnknownColumnException(ColumnDescriptor descriptor) {
+ super("Column not found: " + descriptor.toString());
+ this.descriptor = descriptor;
+ }
+
+ public ColumnDescriptor getDescriptor() {
+ return descriptor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnTypeException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnTypeException.java b/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnTypeException.java
new file mode 100644
index 0000000..126bc48
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/UnknownColumnTypeException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Thrown if the specified column type is unknown in the underlying storage
+ *
+ * @author Katya Gonina
+ */
+public class UnknownColumnTypeException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ private final PrimitiveTypeName type;
+
+ public UnknownColumnTypeException(PrimitiveTypeName type) {
+ super("Column type not found: " + type.toString());
+ this.type= type;
+ }
+
+ public PrimitiveTypeName getType() {
+ return this.type;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/ValuesType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ValuesType.java b/parquet-column/src/main/java/org/apache/parquet/column/ValuesType.java
new file mode 100644
index 0000000..89cf55b
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ValuesType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column;
+
+/**
+ * The different type of values we can store in columns
+ *
+ * @author Julien Le Dem
+ *
+ */
+public enum ValuesType {
+ REPETITION_LEVEL, DEFINITION_LEVEL, VALUES;
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
new file mode 100644
index 0000000..bfbcdb9
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReadStore;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+/**
+ * Implementation of the ColumnReadStore
+ *
+ * Initializes individual columns based on schema and converter
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ColumnReadStoreImpl implements ColumnReadStore {
+
+ private final PageReadStore pageReadStore;
+ private final GroupConverter recordConverter;
+ private final MessageType schema;
+
+ /**
+ * @param pageReadStore uderlying page storage
+ * @param recordConverter the user provided converter to materialize records
+ * @param schema the schema we are reading
+ */
+ public ColumnReadStoreImpl(PageReadStore pageReadStore, GroupConverter recordConverter, MessageType schema) {
+ super();
+ this.pageReadStore = pageReadStore;
+ this.recordConverter = recordConverter;
+ this.schema = schema;
+ }
+
+ @Override
+ public ColumnReader getColumnReader(ColumnDescriptor path) {
+ return newMemColumnReader(path, pageReadStore.getPageReader(path));
+ }
+
+ private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader pageReader) {
+ PrimitiveConverter converter = getPrimitiveConverter(path);
+ return new ColumnReaderImpl(path, pageReader, converter);
+ }
+
+ private PrimitiveConverter getPrimitiveConverter(ColumnDescriptor path) {
+ Type currentType = schema;
+ Converter currentConverter = recordConverter;
+ for (String fieldName : path.getPath()) {
+ final GroupType groupType = currentType.asGroupType();
+ int fieldIndex = groupType.getFieldIndex(fieldName);
+ currentType = groupType.getType(fieldName);
+ currentConverter = currentConverter.asGroupConverter().getConverter(fieldIndex);
+ }
+ PrimitiveConverter converter = currentConverter.asPrimitiveConverter();
+ return converter;
+ }
+
+}