You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by to...@apache.org on 2013/10/09 13:05:17 UTC
git commit: CRUNCH-277. Support Parquet.
Updated Branches:
refs/heads/master 910b6afbe -> f47e778b7
CRUNCH-277. Support Parquet.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f47e778b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f47e778b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f47e778b
Branch: refs/heads/master
Commit: f47e778b78e7d667556cfa0f9ff0c3a936e91e3e
Parents: 910b6af
Author: Tom White <to...@apache.org>
Authored: Wed Oct 9 12:04:43 2013 +0100
Committer: Tom White <to...@apache.org>
Committed: Wed Oct 9 12:04:43 2013 +0100
----------------------------------------------------------------------
crunch-core/pom.xml | 5 +
.../parquet/AvroParquetFileSourceTargetIT.java | 114 +++++++++
.../io/parquet/AvroParquetPipelineIT.java | 237 +++++++++++++++++++
.../crunch/io/parquet/AvroParquetConverter.java | 59 +++++
.../parquet/AvroParquetFileReaderFactory.java | 100 ++++++++
.../io/parquet/AvroParquetFileSource.java | 69 ++++++
.../io/parquet/AvroParquetFileSourceTarget.java | 41 ++++
.../io/parquet/AvroParquetFileTarget.java | 119 ++++++++++
.../AvroParquetFileReaderFactoryTest.java | 104 ++++++++
pom.xml | 13 +
10 files changed, 861 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index 129990f..8a5beb6 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -54,6 +54,11 @@ under the License.
</dependency>
<dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-avro</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
new file mode 100644
index 0000000..b6d51f2
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import parquet.avro.AvroParquetWriter;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class AvroParquetFileSourceTargetIT implements Serializable {
+
+ private transient File avroFile;
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Before
+ public void setUp() throws IOException {
+ avroFile = tmpDir.getFile("test.avro.parquet");
+ }
+
+ private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+ AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(
+ new Path(avroFile.getPath()), schema);
+
+ for (GenericRecord record : genericRecords) {
+ writer.write(record);
+ }
+
+ writer.close();
+ }
+
+ @Test
+ public void testSpecific() throws IOException {
+ GenericRecord savedRecord = new Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Person> genericCollection = pipeline.read(new AvroParquetFileSource<Person>(new Path(avroFile.getAbsolutePath()),
+ Avros.records(Person.class)));
+
+ List<Person> personList = Lists.newArrayList(genericCollection.materialize());
+
+ Person expectedPerson = new Person();
+ expectedPerson.name = "John Doe";
+ expectedPerson.age = 42;
+
+ List<CharSequence> siblingNames = Lists.newArrayList();
+ siblingNames.add("Jimmy");
+ siblingNames.add("Jane");
+ expectedPerson.siblingnames = siblingNames;
+
+ assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+ }
+
+ @Test
+ public void testGeneric() throws IOException {
+ String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
+ Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+ GenericRecord savedRecord = new Record(genericPersonSchema);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Record> genericCollection = pipeline.read(new AvroParquetFileSource<Record>(new Path
+ (avroFile.getAbsolutePath()),
+ Avros.generics(genericPersonSchema)));
+
+ List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
+
+ assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java
new file mode 100644
index 0000000..055d0d7
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import parquet.avro.AvroParquetReader;
+import parquet.avro.AvroParquetWriter;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class AvroParquetPipelineIT implements Serializable {
+
+ private transient File avroFile;
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Before
+ public void setUp() throws IOException {
+ avroFile = tmpDir.getFile("test.avro.parquet");
+ }
+
+ private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+ FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+ GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
+
+ DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+ dataFileWriter.create(schema, outputStream);
+
+ for (GenericRecord record : genericRecords) {
+ dataFileWriter.append(record);
+ }
+
+ dataFileWriter.close();
+ outputStream.close();
+ }
+
+ private void populateGenericParquetFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+ AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(
+ new Path(avroFile.getPath()), schema);
+
+ for (GenericRecord record : genericRecords) {
+ writer.write(record);
+ }
+
+ writer.close();
+ }
+
+ @Test
+ public void toAvroParquetFileTarget() throws Exception {
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+ Avros.records(Person.class)));
+ File outputFile = tmpDir.getFile("output");
+ Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+ pipeline.write(genericCollection, parquetFileTarget);
+ pipeline.run();
+
+ Person person = genericCollection.materialize().iterator().next();
+
+ Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath());
+
+ AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+ try {
+ Person readPerson = reader.read();
+ assertThat(readPerson, is(person));
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void toAvroParquetFileTargetFromParquet() throws Exception {
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericParquetFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Person> genericCollection = pipeline.read(
+ new AvroParquetFileSource<Person>(new Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
+ File outputFile = tmpDir.getFile("output");
+ Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+ pipeline.write(genericCollection, parquetFileTarget);
+ pipeline.run();
+
+ Person person = genericCollection.materialize().iterator().next();
+
+ Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath());
+
+ AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+ try {
+ Person readPerson = reader.read();
+ assertThat(readPerson, is(person));
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void toAvroParquetFileMultipleTarget() throws Exception {
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+ Avros.records(Person.class)));
+
+ PCollection<Employee> employees = genericCollection.parallelDo(new DoFn<Person, Employee>() {
+ @Override
+ public void process(Person person, Emitter<Employee> emitter) {
+ emitter.emit(new Employee(person.getName(), 0, "Eng"));
+ }
+ }, Avros.records(Employee.class));
+
+ File output1File = tmpDir.getFile("output1");
+ File output2File = tmpDir.getFile("output2");
+ pipeline.write(genericCollection, new AvroParquetFileTarget(output1File.getAbsolutePath()));
+ pipeline.write(employees, new AvroParquetFileSourceTarget(new Path(output2File.getAbsolutePath()),
+ Avros.records(Employee.class)));
+ pipeline.run();
+
+ Person person = genericCollection.materialize().iterator().next();
+ Employee employee = employees.materialize().iterator().next();
+
+ Path parquet1File = new Path(new File(output1File, "part-m-00000.parquet").getPath());
+ Path parquet2File = new Path(new File(output2File, "part-m-00000.parquet").getPath());
+
+ AvroParquetReader<Person> personReader = new AvroParquetReader<Person>(parquet1File);
+
+ try {
+ Person readPerson = personReader.read();
+ assertThat(readPerson, is(person));
+ } finally {
+ personReader.close();
+ }
+
+ AvroParquetReader<Employee> employeeReader = new AvroParquetReader<Employee>(parquet2File);
+
+ try {
+ Employee readEmployee = employeeReader.read();
+ assertThat(readEmployee, is(employee));
+ } finally {
+ employeeReader.close();
+ }
+
+ }
+
+ @Test
+ public void toAvroParquetFileTargetReadSource() throws Exception {
+ GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+ Avros.records(Person.class)));
+ File outputFile = tmpDir.getFile("output");
+ Target parquetFileTarget = new AvroParquetFileTarget(outputFile.getAbsolutePath());
+ pipeline.write(genericCollection, parquetFileTarget);
+ pipeline.run();
+
+ Person person = genericCollection.materialize().iterator().next();
+
+ PCollection<Person> retrievedPeople = pipeline.read(new AvroParquetFileSource<Person>(
+ new Path(outputFile.toURI()), Avros.records(Person.class)));
+
+ Person retrievedPerson = retrievedPeople.materialize().iterator().next();
+
+ assertThat(retrievedPerson, is(person));
+
+ Path parquetFile = new Path(new File(outputFile, "part-m-00000.parquet").getPath());
+
+ AvroParquetReader<Person> reader = new AvroParquetReader<Person>(parquetFile);
+
+ try {
+ Person readPerson = reader.read();
+ assertThat(readPerson, is(person));
+ } finally {
+ reader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java
new file mode 100644
index 0000000..5cb231f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetConverter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.avro.AvroType;
+
+class AvroParquetConverter<T> implements Converter<Void, T, T, Iterable<T>> {
+ private AvroType<T> ptype;
+
+ public AvroParquetConverter(AvroType<T> ptype) {
+ this.ptype = ptype;
+ }
+
+ @Override
+ public T convertInput(Void key, T value) {
+ return value;
+ }
+
+ @Override
+ public Iterable<T> convertIterableInput(Void key, Iterable<T> value) {
+ return value;
+ }
+
+ @Override
+ public Void outputKey(T value) {
+ return null;
+ }
+
+ @Override
+ public T outputValue(T value) {
+ return value;
+ }
+
+ @Override
+ public Class<Void> getKeyClass() {
+ return Void.class;
+ }
+
+ @Override
+ public Class<T> getValueClass() {
+ return ptype.getTypeClass();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java
new file mode 100644
index 0000000..c193563
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactory.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import com.google.common.collect.UnmodifiableIterator;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.avro.AvroReadSupport;
+import parquet.hadoop.ParquetReader;
+import parquet.schema.MessageType;
+
+class AvroParquetFileReaderFactory<T> implements FileReaderFactory<T> {
+
+ private AvroType<T> avroType;
+
+ public AvroParquetFileReaderFactory(AvroType<T> avroType) {
+ this.avroType = avroType;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Iterator<T> read(FileSystem fs, Path path) {
+ Path p = fs.makeQualified(path);
+ final ParquetReader reader;
+ try {
+ reader = new ParquetReader(p, new CrunchAvroReadSupport(avroType));
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
+
+ private T next;
+
+ @Override
+ public boolean hasNext() {
+ if (next != null) {
+ return true;
+ }
+ try {
+ next = (T) reader.read();
+ } catch (IOException e) {
+ throw new CrunchRuntimeException(e);
+ }
+ return next != null;
+ }
+
+ @Override
+ public T next() {
+ if (hasNext()) {
+ T ret = next;
+ next = null;
+ return ret;
+ }
+ throw new NoSuchElementException();
+ }
+ });
+
+ }
+
+ static class CrunchAvroReadSupport<T extends IndexedRecord> extends AvroReadSupport<T> {
+ private AvroType<T> avroType;
+
+ public CrunchAvroReadSupport(AvroType<T> avroType) {
+ this.avroType = avroType;
+ }
+
+ @Override
+ public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
+ if (avroType != null) {
+ setRequestedProjection(configuration, avroType.getSchema());
+ }
+ return super.init(configuration, keyValueMetaData, fileSchema);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
new file mode 100644
index 0000000..81678d4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.avro.AvroParquetInputFormat;
+import parquet.avro.AvroReadSupport;
+
+public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+
+ private static <S> FormatBundle<AvroParquetInputFormat> getBundle(AvroType<S> ptype) {
+ return FormatBundle.forInput(AvroParquetInputFormat.class)
+ .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, ptype.getSchema().toString())
+ // ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
+ // doesn't work with CombineFileInputFormat
+ .set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+ }
+
+ public AvroParquetFileSource(Path path, AvroType<T> ptype) {
+ super(path, ptype, getBundle(ptype));
+ }
+
+ public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) {
+ super(paths, ptype, getBundle(ptype));
+ }
+
+ @Override
+ public Iterable<T> read(Configuration conf) throws IOException {
+ return read(conf, getFileReaderFactory((AvroType<T>) ptype));
+ }
+
+ protected AvroParquetFileReaderFactory<T> getFileReaderFactory(AvroType<T> ptype){
+ return new AvroParquetFileReaderFactory<T>(ptype);
+ }
+
+ @Override
+ public Converter<?, ?, ?, ?> getConverter() {
+ return new AvroParquetConverter<T>((AvroType<T>) ptype);
+ }
+
+ @Override
+ public String toString() {
+ return "Parquet(" + pathsAsString() + ")";
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
new file mode 100644
index 0000000..8d93eba
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+public class AvroParquetFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+
+ public AvroParquetFileSourceTarget(Path path, AvroType<T> atype) {
+ this(path, atype, SequentialFileNamingScheme.getInstance());
+ }
+
+ public AvroParquetFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) {
+ super(new AvroParquetFileSource<T>(path, atype), new AvroParquetFileTarget(path),
+ fileNamingScheme);
+ }
+
+ @Override
+ public String toString() {
+ return target.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
new file mode 100644
index 0000000..c67b9f1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import parquet.avro.AvroWriteSupport;
+import parquet.hadoop.ParquetOutputFormat;
+
+public class AvroParquetFileTarget extends FileTargetImpl {
+
+ private static final String PARQUET_AVRO_SCHEMA_PARAMETER = "parquet.avro.schema";
+
+ public AvroParquetFileTarget(String path) {
+ this(new Path(path));
+ }
+
+ public AvroParquetFileTarget(Path path) {
+ this(path, SequentialFileNamingScheme.getInstance());
+ }
+
+ public AvroParquetFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+ super(path, CrunchAvroParquetOutputFormat.class, fileNamingScheme);
+ }
+
+ @Override
+ public String toString() {
+ return "Parquet(" + path.toString() + ")";
+ }
+
+ @Override
+ public boolean accept(OutputHandler handler, PType<?> ptype) {
+ if (!(ptype instanceof AvroType)) {
+ return false;
+ }
+ handler.configure(this, ptype);
+ return true;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+ return new AvroParquetConverter<Object>((AvroType<Object>) ptype);
+ }
+
+ @Override
+ public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+ AvroType<?> atype = (AvroType<?>) ptype;
+ Configuration conf = job.getConfiguration();
+ String schemaParam;
+ if (name == null) {
+ schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER;
+ } else {
+ schemaParam = PARQUET_AVRO_SCHEMA_PARAMETER + "." + name;
+ }
+ String outputSchema = conf.get(schemaParam);
+ if (outputSchema == null) {
+ conf.set(schemaParam, atype.getSchema().toString());
+ } else if (!outputSchema.equals(atype.getSchema().toString())) {
+ throw new IllegalStateException("Avro targets must use the same output schema");
+ }
+ configureForMapReduce(job, Void.class, atype.getTypeClass(),
+ CrunchAvroParquetOutputFormat.class, outputPath, name);
+ }
+
+ @Override
+ public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+ if (ptype instanceof AvroType) {
+ return new AvroParquetFileSourceTarget<T>(path, (AvroType<T>) ptype);
+ }
+ return null;
+ }
+
+ static class CrunchAvroWriteSupport extends AvroWriteSupport {
+ @Override
+ public WriteContext init(Configuration conf) {
+ String outputName = conf.get("crunch.namedoutput");
+ if (outputName != null && !outputName.isEmpty()) {
+ String schema = conf.get(PARQUET_AVRO_SCHEMA_PARAMETER + "." + outputName);
+ setSchema(conf, new Schema.Parser().parse(schema));
+ }
+ return super.init(conf);
+ }
+ }
+
+ static class CrunchAvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> {
+
+ public CrunchAvroParquetOutputFormat() {
+ super(new CrunchAvroWriteSupport());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java
new file mode 100644
index 0000000..9f5ff70
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/parquet/AvroParquetFileReaderFactoryTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.parquet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import parquet.avro.AvroParquetWriter;
+
+public class AvroParquetFileReaderFactoryTest {
+
+ private File parquetFile;
+
+ @Rule
+ public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Before
+ public void setUp() throws IOException {
+ parquetFile = tmpDir.getFile("test.avro.parquet");
+ }
+
+ @After
+ public void tearDown() {
+ parquetFile.delete();
+ }
+
+ private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+ AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(
+ new Path(parquetFile.getPath()), schema);
+
+ for (GenericRecord record : genericRecords) {
+ writer.write(record);
+ }
+
+ writer.close();
+ }
+
+ private <T> AvroParquetFileReaderFactory<T> createFileReaderFactory(AvroType<T> avroType) {
+ return new AvroParquetFileReaderFactory<T>(avroType);
+ }
+
+ @Test
+ public void testProjection() throws IOException {
+ String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
+ Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+ GenericRecord savedRecord = new Record(genericPersonSchema);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
+
+ Schema projection = Schema.createRecord("projection", null, null, false);
+ projection.setFields(Lists.newArrayList(cloneField(genericPersonSchema.getField("name"))));
+ AvroParquetFileReaderFactory<Record> genericReader = createFileReaderFactory(Avros.generics(projection));
+ Iterator<Record> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()),
+ new Path(this.parquetFile.getAbsolutePath()));
+
+ GenericRecord genericRecord = recordIterator.next();
+ assertEquals(savedRecord.get("name"), genericRecord.get("name"));
+ assertNull(genericRecord.get("age"));
+ assertFalse(recordIterator.hasNext());
+ }
+
+ public static Schema.Field cloneField(Schema.Field field) {
+ return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/f47e778b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba3258c..45283f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@ under the License.
<commons-logging.version>1.1.1</commons-logging.version>
<commons-cli.version>1.2</commons-cli.version>
<avro.version>1.7.4</avro.version>
+ <parquet.version>1.2.0</parquet.version>
<javassist.version>3.16.1-GA</javassist.version>
<jackson.version>1.8.8</jackson.version>
<protobuf-java.version>2.4.0a</protobuf-java.version>
@@ -209,6 +210,18 @@ under the License.
</dependency>
<dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>${javassist.version}</version>