You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/02/01 04:21:56 UTC
[2/2] parquet-mr git commit: PARQUET-480: Update for Cascading 3.0
PARQUET-480: Update for Cascading 3.0
The code in parquet-cascading is adapted to the API as of Cascading 2.5.3
Some incompatible changes were introduced in Cascading 3.0. This patch forks the parquet-cascading module to also provide a parquet-cascading3 module, which is about identical save for overloads which changed from requiring a Foo<JobConf> to requiring a Foo<? extends JobConf>
Author: Cyrille Chépélov (TP12) <cc...@transparencyrights.com>
Closes #284 from cchepelov/try_cascading3 and squashes the following commits:
e7d1304 [Cyrille Chépélov (TP12)] Adding a @Deprecated notice on parquet-cascading's remaining classes
05a417d [Cyrille Chépélov (TP12)] cascading2/3: share back TupleWriteSupport.java (accidentally unmerged)
7fff2d4 [Cyrille Chépélov (TP12)] cascading/cascading3: remove duplicates, push common files into parquet-cascading-common23
338a416 [Cyrille Chépélov (TP12)] Removing unwanted file (what?!) + .gitignoring this kind of files
d9f0455 [Cyrille Chépélov (TP12)] TupleEntry#get is now TupleEntry#getObject
a7f490a [Cyrille Chépélov (TP12)] Revert "Missing test conversion to Cascading 3.0"
cc8b870 [Cyrille Chépélov (TP12)] Missing test conversion to Cascading 3.0
2d73512 [Cyrille Chépélov (TP12)] conflicting values can come in one order or the other. Accept both.
33355d5 [Cyrille Chépélov (TP12)] Fix version mismatch (duh!)
7128639 [Cyrille Chépélov (TP12)] non-C locale can break tests implementation (decimal formats)
53aa2f9 [Cyrille Chépélov (TP12)] Adding a parquet-cascading3 module (forking the parquet-cascading module and accounting for API changes)
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/57694790
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/57694790
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/57694790
Branch: refs/heads/master
Commit: 57694790f8ca0e1a4f3ac76fbd25a6dd13041e03
Parents: af9fd05
Author: Cyrille Chépélov (TP12) <cc...@transparencyrights.com>
Authored: Sun Jan 31 19:21:48 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Sun Jan 31 19:21:48 2016 -0800
----------------------------------------------------------------------
.gitignore | 3 +
README.md | 2 +-
.../parquet/cascading/SchemaIntersection.java | 63 ++++++
.../parquet/cascading/TupleReadSupport.java | 80 ++++++++
.../parquet/cascading/TupleWriteSupport.java | 111 +++++++++++
.../cascading/convert/TupleConverter.java | 115 +++++++++++
.../convert/TupleRecordMaterializer.java | 46 +++++
.../cascading/TestParquetTupleScheme.java | 182 ++++++++++++++++++
.../src/test/resources/names.txt | 3 +
.../src/test/thrift/test.thrift | 25 +++
parquet-cascading/pom.xml | 47 +++++
.../parquet/cascading/ParquetTBaseScheme.java | 1 +
.../parquet/cascading/ParquetTupleScheme.java | 1 +
.../parquet/cascading/ParquetValueScheme.java | 1 +
.../parquet/cascading/SchemaIntersection.java | 63 ------
.../parquet/cascading/TupleReadSupport.java | 80 --------
.../parquet/cascading/TupleWriteSupport.java | 111 -----------
.../cascading/convert/TupleConverter.java | 115 -----------
.../convert/TupleRecordMaterializer.java | 46 -----
.../cascading/TestParquetTBaseScheme.java | 3 +-
.../cascading/TestParquetTupleScheme.java | 182 ------------------
parquet-cascading/src/test/resources/names.txt | 3 -
parquet-cascading/src/test/thrift/test.thrift | 25 ---
parquet-cascading3/REVIEWERS.md | 27 +++
parquet-cascading3/pom.xml | 178 +++++++++++++++++
.../parquet/cascading/ParquetTBaseScheme.java | 80 ++++++++
.../parquet/cascading/ParquetTupleScheme.java | 191 +++++++++++++++++++
.../parquet/cascading/ParquetValueScheme.java | 184 ++++++++++++++++++
.../cascading/TestParquetTBaseScheme.java | 186 ++++++++++++++++++
.../parquet/hadoop/TestMergeMetadataFiles.java | 10 +-
parquet_cascading.md | 13 ++
pom.xml | 26 +++
32 files changed, 1574 insertions(+), 629 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index cd3c066..aa67d3d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,3 +16,6 @@ dependency-reduced-pom.xml
parquet-scrooge/.cache
.idea/*
target/
+.cache
+*~
+mvn_install.log
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 2d9a50a..9bb0be6 100644
--- a/README.md
+++ b/README.md
@@ -62,7 +62,7 @@ sudo make install
Once protobuf and thrift are available in your path, you can build the project by running:
```
-mvn clean install
+LC_ALL=C mvn clean install
```
## Features
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
new file mode 100644
index 0000000..e3fc3f7
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import cascading.tuple.Fields;
+
+import java.util.List;
+import java.util.ArrayList;
+
+public class SchemaIntersection {
+
+ private final MessageType requestedSchema;
+ private final Fields sourceFields;
+
+ public SchemaIntersection(MessageType fileSchema, Fields requestedFields) {
+ if(requestedFields == Fields.UNKNOWN)
+ requestedFields = Fields.ALL;
+
+ Fields newFields = Fields.NONE;
+ List<Type> newSchemaFields = new ArrayList<Type>();
+ int schemaSize = fileSchema.getFieldCount();
+
+ for (int i = 0; i < schemaSize; i++) {
+ Type type = fileSchema.getType(i);
+ Fields name = new Fields(type.getName());
+
+ if(requestedFields.contains(name)) {
+ newFields = newFields.append(name);
+ newSchemaFields.add(type);
+ }
+ }
+
+ this.sourceFields = newFields;
+ this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields);
+ }
+
+ public MessageType getRequestedSchema() {
+ return requestedSchema;
+ }
+
+ public Fields getSourceFields() {
+ return sourceFields;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
new file mode 100644
index 0000000..42a5926
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import java.util.Map;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.commons.lang.StringUtils;
+
+import cascading.tuple.Tuple;
+import cascading.tuple.Fields;
+import cascading.flow.hadoop.util.HadoopUtil;
+
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.cascading.convert.TupleRecordMaterializer;
+
+
+public class TupleReadSupport extends ReadSupport<Tuple> {
+ static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields";
+
+ static protected Fields getRequestedFields(Configuration configuration) {
+ String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS);
+
+ if(fieldsString == null)
+ return Fields.ALL;
+
+ String[] parts = StringUtils.split(fieldsString, ":");
+ if(parts.length == 0)
+ return Fields.ALL;
+ else
+ return new Fields(parts);
+ }
+
+ static protected void setRequestedFields(JobConf configuration, Fields fields) {
+ String fieldsString = StringUtils.join(fields.iterator(), ":");
+ configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, fieldsString);
+ }
+
+ @Override
+ public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
+ Fields requestedFields = getRequestedFields(configuration);
+ if (requestedFields == null) {
+ return new ReadContext(fileSchema);
+ } else {
+ SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields);
+ return new ReadContext(intersection.getRequestedSchema());
+ }
+ }
+
+ @Override
+ public RecordMaterializer<Tuple> prepareForRead(
+ Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadContext readContext) {
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ return new TupleRecordMaterializer(requestedSchema);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
new file mode 100644
index 0000000..032f534
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.tuple.TupleEntry;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+/**
+ *
+ *
+ * @author Mickaël Lacour <m....@criteo.com>
+ */
+public class TupleWriteSupport extends WriteSupport<TupleEntry> {
+
+ private RecordConsumer recordConsumer;
+ private MessageType rootSchema;
+ public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema";
+
+ @Override
+ public String getName() {
+ return "cascading";
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ String schema = configuration.get(PARQUET_CASCADING_SCHEMA);
+ rootSchema = MessageTypeParser.parseMessageType(schema);
+ return new WriteContext(rootSchema, new HashMap<String, String>());
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public void write(TupleEntry record) {
+ recordConsumer.startMessage();
+ final List<Type> fields = rootSchema.getFields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ Type field = fields.get(i);
+
+ if (record == null || record.getObject(field.getName()) == null) {
+ continue;
+ }
+ recordConsumer.startField(field.getName(), i);
+ if (field.isPrimitive()) {
+ writePrimitive(record, field.asPrimitiveType());
+ } else {
+ throw new UnsupportedOperationException("Complex type not implemented");
+ }
+ recordConsumer.endField(field.getName(), i);
+ }
+ recordConsumer.endMessage();
+ }
+
+ private void writePrimitive(TupleEntry record, PrimitiveType field) {
+ switch (field.getPrimitiveTypeName()) {
+ case BINARY:
+ recordConsumer.addBinary(Binary.fromString(record.getString(field.getName())));
+ break;
+ case BOOLEAN:
+ recordConsumer.addBoolean(record.getBoolean(field.getName()));
+ break;
+ case INT32:
+ recordConsumer.addInteger(record.getInteger(field.getName()));
+ break;
+ case INT64:
+ recordConsumer.addLong(record.getLong(field.getName()));
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(record.getDouble(field.getName()));
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(record.getFloat(field.getName()));
+ break;
+ case FIXED_LEN_BYTE_ARRAY:
+ throw new UnsupportedOperationException("Fixed len byte array type not implemented");
+ case INT96:
+ throw new UnsupportedOperationException("Int96 type not implemented");
+ default:
+ throw new UnsupportedOperationException(field.getName() + " type not implemented");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
new file mode 100644
index 0000000..3741165
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading.convert;
+
+import cascading.tuple.Tuple;
+
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.pig.TupleConversionException;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+
+public class TupleConverter extends GroupConverter {
+
+ protected Tuple currentTuple;
+ private final Converter[] converters;
+
+ public TupleConverter(GroupType parquetSchema) {
+ int schemaSize = parquetSchema.getFieldCount();
+
+ this.converters = new Converter[schemaSize];
+ for (int i = 0; i < schemaSize; i++) {
+ Type type = parquetSchema.getType(i);
+ converters[i] = newConverter(type, i);
+ }
+ }
+
+ private Converter newConverter(Type type, int i) {
+ if(!type.isPrimitive()) {
+ throw new IllegalArgumentException("cascading can only build tuples from primitive types");
+ } else {
+ return new TuplePrimitiveConverter(this, i);
+ }
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return converters[fieldIndex];
+ }
+
+ @Override
+ final public void start() {
+ currentTuple = Tuple.size(converters.length);
+ }
+
+ @Override
+ public void end() {
+ }
+
+ final public Tuple getCurrentTuple() {
+ return currentTuple;
+ }
+
+ static final class TuplePrimitiveConverter extends PrimitiveConverter {
+ private final TupleConverter parent;
+ private final int index;
+
+ public TuplePrimitiveConverter(TupleConverter parent, int index) {
+ this.parent = parent;
+ this.index = index;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ parent.getCurrentTuple().setString(index, value.toStringUsingUTF8());
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ parent.getCurrentTuple().setBoolean(index, value);
+ }
+
+ @Override
+ public void addDouble(double value) {
+ parent.getCurrentTuple().setDouble(index, value);
+ }
+
+ @Override
+ public void addFloat(float value) {
+ parent.getCurrentTuple().setFloat(index, value);
+ }
+
+ @Override
+ public void addInt(int value) {
+ parent.getCurrentTuple().setInteger(index, value);
+ }
+
+ @Override
+ public void addLong(long value) {
+ parent.getCurrentTuple().setLong(index, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
new file mode 100644
index 0000000..275e17b
--- /dev/null
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading.convert;
+
+import cascading.tuple.Tuple;
+import cascading.tuple.Fields;
+
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+
+public class TupleRecordMaterializer extends RecordMaterializer<Tuple> {
+
+ private TupleConverter root;
+
+ public TupleRecordMaterializer(GroupType parquetSchema) {
+ this.root = new TupleConverter(parquetSchema);
+ }
+
+ @Override
+ public Tuple getCurrentRecord() {
+ return root.getCurrentTuple();
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
new file mode 100644
index 0000000..de350dd
--- /dev/null
+++ b/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import cascading.flow.Flow;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowConnector;
+import cascading.operation.BaseOperation;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.pipe.Each;
+import cascading.pipe.Pipe;
+import cascading.scheme.Scheme;
+import cascading.scheme.hadoop.TextLine;
+import cascading.tap.Tap;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.junit.Test;
+import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.thrift.test.Name;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestParquetTupleScheme {
+ final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in";
+ final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out";
+
+ @Test
+ public void testReadPattern() throws Exception {
+ String sourceFolder = parquetInputPath;
+ testReadWrite(sourceFolder);
+
+ String sourceGlobPattern = parquetInputPath + "/*";
+ testReadWrite(sourceGlobPattern);
+
+ String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*";
+ testReadWrite(multiLevelGlobPattern);
+ }
+
+ @Test
+ public void testFieldProjection() throws Exception {
+ createFileForRead();
+
+ Path path = new Path(txtOutputPath);
+ final FileSystem fs = path.getFileSystem(new Configuration());
+ if (fs.exists(path)) fs.delete(path, true);
+
+ Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name"));
+ Tap source = new Hfs(sourceScheme, parquetInputPath);
+
+ Scheme sinkScheme = new TextLine(new Fields("last_name"));
+ Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+ Pipe assembly = new Pipe("namecp");
+ assembly = new Each(assembly, new ProjectedTupleFunction());
+ Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+ flow.complete();
+ String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
+ assertEquals("Practice\nHope\nHorse\n", result);
+ }
+
+ public void testReadWrite(String inputPath) throws Exception {
+ createFileForRead();
+
+ Path path = new Path(txtOutputPath);
+ final FileSystem fs = path.getFileSystem(new Configuration());
+ if (fs.exists(path)) fs.delete(path, true);
+
+ Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name"));
+ Tap source = new Hfs(sourceScheme, inputPath);
+
+ Scheme sinkScheme = new TextLine(new Fields("first", "last"));
+ Tap sink = new Hfs(sinkScheme, txtOutputPath);
+
+ Pipe assembly = new Pipe("namecp");
+ assembly = new Each(assembly, new UnpackTupleFunction());
+ Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
+
+ flow.complete();
+ String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
+ assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
+ }
+
+ private void createFileForRead() throws Exception {
+ final Path fileToCreate = new Path(parquetInputPath + "/names.parquet");
+
+ final Configuration conf = new Configuration();
+ final FileSystem fs = fileToCreate.getFileSystem(conf);
+ if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
+
+ TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
+ TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
+ ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+
+ Name n1 = new Name();
+ n1.setFirst_name("Alice");
+ n1.setLast_name("Practice");
+ Name n2 = new Name();
+ n2.setFirst_name("Bob");
+ n2.setLast_name("Hope");
+ Name n3 = new Name();
+ n3.setFirst_name("Charlie");
+ n3.setLast_name("Horse");
+
+ n1.write(protocol);
+ w.write(new BytesWritable(baos.toByteArray()));
+ baos.reset();
+ n2.write(protocol);
+ w.write(new BytesWritable(baos.toByteArray()));
+ baos.reset();
+ n3.write(protocol);
+ w.write(new BytesWritable(baos.toByteArray()));
+ w.close();
+ }
+
+ private static class UnpackTupleFunction extends BaseOperation implements Function {
+ @Override
+ public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+ TupleEntry arguments = functionCall.getArguments();
+ Tuple result = new Tuple();
+
+ Tuple name = new Tuple();
+ name.addString(arguments.getString(0));
+ name.addString(arguments.getString(1));
+
+ result.add(name);
+ functionCall.getOutputCollector().add(result);
+ }
+ }
+
+ private static class ProjectedTupleFunction extends BaseOperation implements Function {
+ @Override
+ public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+ TupleEntry arguments = functionCall.getArguments();
+ Tuple result = new Tuple();
+
+ Tuple name = new Tuple();
+ name.addString(arguments.getString(0));
+// name.addString(arguments.getString(1));
+
+ result.add(name);
+ functionCall.getOutputCollector().add(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/resources/names.txt
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/test/resources/names.txt b/parquet-cascading-common23/src/test/resources/names.txt
new file mode 100644
index 0000000..e2d0408
--- /dev/null
+++ b/parquet-cascading-common23/src/test/resources/names.txt
@@ -0,0 +1,3 @@
+Alice Practive
+Bob Hope
+Charlie Horse
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading-common23/src/test/thrift/test.thrift
----------------------------------------------------------------------
diff --git a/parquet-cascading-common23/src/test/thrift/test.thrift b/parquet-cascading-common23/src/test/thrift/test.thrift
new file mode 100644
index 0000000..c58843d
--- /dev/null
+++ b/parquet-cascading-common23/src/test/thrift/test.thrift
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace java org.apache.parquet.thrift.test
+
+struct Name {
+ 1: required string first_name,
+ 2: optional string last_name
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml
index 0cd8588..cabb003 100644
--- a/parquet-cascading/pom.xml
+++ b/parquet-cascading/pom.xml
@@ -103,6 +103,51 @@
<build>
<plugins>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>../parquet-cascading-common23/src/main/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>../parquet-cascading-common23/src/test/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-test-resource</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>add-test-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+ <directory>../parquet-cascading-common23/src/test/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
<plugin>
@@ -115,6 +160,8 @@
<version>0.1.10</version>
<configuration>
<thriftExecutable>${thrift.executable}</thriftExecutable>
+ <thriftSourceRoot>../parquet-cascading-common23/src/main/thrift</thriftSourceRoot>
+ <thriftTestSourceRoot>../parquet-cascading-common23/src/test/thrift</thriftTestSourceRoot>
</configuration>
<executions>
<execution>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
index ea70d43..b34ee7d 100644
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
@@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
import org.apache.parquet.hadoop.thrift.TBaseWriteSupport;
import org.apache.parquet.thrift.TBaseRecordConverter;
+@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
// In the case of reads, we can read the thrift class from the file metadata
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
index 41b56d0..3b7d715 100644
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
@@ -59,6 +59,7 @@ import static org.apache.parquet.Preconditions.checkNotNull;
* @author Avi Bryant
*/
+@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
index 9549ef4..6c34a84 100644
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
+++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java
@@ -47,6 +47,7 @@ import static org.apache.parquet.Preconditions.checkNotNull;
* This is an abstract class; implementations are expected to set up their Input/Output Formats
* correctly in the respective Init methods.
*/
+@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
public abstract class ParquetValueScheme<T> extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
public static final class Config<T> implements Serializable {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
deleted file mode 100644
index e3fc3f7..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading;
-
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
-
-import cascading.tuple.Fields;
-
-import java.util.List;
-import java.util.ArrayList;
-
-public class SchemaIntersection {
-
- private final MessageType requestedSchema;
- private final Fields sourceFields;
-
- public SchemaIntersection(MessageType fileSchema, Fields requestedFields) {
- if(requestedFields == Fields.UNKNOWN)
- requestedFields = Fields.ALL;
-
- Fields newFields = Fields.NONE;
- List<Type> newSchemaFields = new ArrayList<Type>();
- int schemaSize = fileSchema.getFieldCount();
-
- for (int i = 0; i < schemaSize; i++) {
- Type type = fileSchema.getType(i);
- Fields name = new Fields(type.getName());
-
- if(requestedFields.contains(name)) {
- newFields = newFields.append(name);
- newSchemaFields.add(type);
- }
- }
-
- this.sourceFields = newFields;
- this.requestedSchema = new MessageType(fileSchema.getName(), newSchemaFields);
- }
-
- public MessageType getRequestedSchema() {
- return requestedSchema;
- }
-
- public Fields getSourceFields() {
- return sourceFields;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
deleted file mode 100644
index 42a5926..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading;
-
-import java.util.Map;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.commons.lang.StringUtils;
-
-import cascading.tuple.Tuple;
-import cascading.tuple.Fields;
-import cascading.flow.hadoop.util.HadoopUtil;
-
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.cascading.convert.TupleRecordMaterializer;
-
-
-public class TupleReadSupport extends ReadSupport<Tuple> {
- static final String PARQUET_CASCADING_REQUESTED_FIELDS = "parquet.cascading.requested.fields";
-
- static protected Fields getRequestedFields(Configuration configuration) {
- String fieldsString = configuration.get(PARQUET_CASCADING_REQUESTED_FIELDS);
-
- if(fieldsString == null)
- return Fields.ALL;
-
- String[] parts = StringUtils.split(fieldsString, ":");
- if(parts.length == 0)
- return Fields.ALL;
- else
- return new Fields(parts);
- }
-
- static protected void setRequestedFields(JobConf configuration, Fields fields) {
- String fieldsString = StringUtils.join(fields.iterator(), ":");
- configuration.set(PARQUET_CASCADING_REQUESTED_FIELDS, fieldsString);
- }
-
- @Override
- public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
- Fields requestedFields = getRequestedFields(configuration);
- if (requestedFields == null) {
- return new ReadContext(fileSchema);
- } else {
- SchemaIntersection intersection = new SchemaIntersection(fileSchema, requestedFields);
- return new ReadContext(intersection.getRequestedSchema());
- }
- }
-
- @Override
- public RecordMaterializer<Tuple> prepareForRead(
- Configuration configuration,
- Map<String, String> keyValueMetaData,
- MessageType fileSchema,
- ReadContext readContext) {
- MessageType requestedSchema = readContext.getRequestedSchema();
- return new TupleRecordMaterializer(requestedSchema);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
deleted file mode 100644
index 032f534..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading;
-
-import cascading.tuple.TupleEntry;
-import java.util.HashMap;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.hadoop.api.WriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-
-/**
- *
- *
- * @author Mickaël Lacour <m....@criteo.com>
- */
-public class TupleWriteSupport extends WriteSupport<TupleEntry> {
-
- private RecordConsumer recordConsumer;
- private MessageType rootSchema;
- public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema";
-
- @Override
- public String getName() {
- return "cascading";
- }
-
- @Override
- public WriteContext init(Configuration configuration) {
- String schema = configuration.get(PARQUET_CASCADING_SCHEMA);
- rootSchema = MessageTypeParser.parseMessageType(schema);
- return new WriteContext(rootSchema, new HashMap<String, String>());
- }
-
- @Override
- public void prepareForWrite(RecordConsumer recordConsumer) {
- this.recordConsumer = recordConsumer;
- }
-
- @Override
- public void write(TupleEntry record) {
- recordConsumer.startMessage();
- final List<Type> fields = rootSchema.getFields();
-
- for (int i = 0; i < fields.size(); i++) {
- Type field = fields.get(i);
-
- if (record == null || record.getObject(field.getName()) == null) {
- continue;
- }
- recordConsumer.startField(field.getName(), i);
- if (field.isPrimitive()) {
- writePrimitive(record, field.asPrimitiveType());
- } else {
- throw new UnsupportedOperationException("Complex type not implemented");
- }
- recordConsumer.endField(field.getName(), i);
- }
- recordConsumer.endMessage();
- }
-
- private void writePrimitive(TupleEntry record, PrimitiveType field) {
- switch (field.getPrimitiveTypeName()) {
- case BINARY:
- recordConsumer.addBinary(Binary.fromString(record.getString(field.getName())));
- break;
- case BOOLEAN:
- recordConsumer.addBoolean(record.getBoolean(field.getName()));
- break;
- case INT32:
- recordConsumer.addInteger(record.getInteger(field.getName()));
- break;
- case INT64:
- recordConsumer.addLong(record.getLong(field.getName()));
- break;
- case DOUBLE:
- recordConsumer.addDouble(record.getDouble(field.getName()));
- break;
- case FLOAT:
- recordConsumer.addFloat(record.getFloat(field.getName()));
- break;
- case FIXED_LEN_BYTE_ARRAY:
- throw new UnsupportedOperationException("Fixed len byte array type not implemented");
- case INT96:
- throw new UnsupportedOperationException("Int96 type not implemented");
- default:
- throw new UnsupportedOperationException(field.getName() + " type not implemented");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
deleted file mode 100644
index 3741165..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading.convert;
-
-import cascading.tuple.Tuple;
-
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.pig.TupleConversionException;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-import org.apache.parquet.schema.Type.Repetition;
-
-public class TupleConverter extends GroupConverter {
-
- protected Tuple currentTuple;
- private final Converter[] converters;
-
- public TupleConverter(GroupType parquetSchema) {
- int schemaSize = parquetSchema.getFieldCount();
-
- this.converters = new Converter[schemaSize];
- for (int i = 0; i < schemaSize; i++) {
- Type type = parquetSchema.getType(i);
- converters[i] = newConverter(type, i);
- }
- }
-
- private Converter newConverter(Type type, int i) {
- if(!type.isPrimitive()) {
- throw new IllegalArgumentException("cascading can only build tuples from primitive types");
- } else {
- return new TuplePrimitiveConverter(this, i);
- }
- }
-
- @Override
- public Converter getConverter(int fieldIndex) {
- return converters[fieldIndex];
- }
-
- @Override
- final public void start() {
- currentTuple = Tuple.size(converters.length);
- }
-
- @Override
- public void end() {
- }
-
- final public Tuple getCurrentTuple() {
- return currentTuple;
- }
-
- static final class TuplePrimitiveConverter extends PrimitiveConverter {
- private final TupleConverter parent;
- private final int index;
-
- public TuplePrimitiveConverter(TupleConverter parent, int index) {
- this.parent = parent;
- this.index = index;
- }
-
- @Override
- public void addBinary(Binary value) {
- parent.getCurrentTuple().setString(index, value.toStringUsingUTF8());
- }
-
- @Override
- public void addBoolean(boolean value) {
- parent.getCurrentTuple().setBoolean(index, value);
- }
-
- @Override
- public void addDouble(double value) {
- parent.getCurrentTuple().setDouble(index, value);
- }
-
- @Override
- public void addFloat(float value) {
- parent.getCurrentTuple().setFloat(index, value);
- }
-
- @Override
- public void addInt(int value) {
- parent.getCurrentTuple().setInteger(index, value);
- }
-
- @Override
- public void addLong(long value) {
- parent.getCurrentTuple().setLong(index, value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
deleted file mode 100644
index 275e17b..0000000
--- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading.convert;
-
-import cascading.tuple.Tuple;
-import cascading.tuple.Fields;
-
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.schema.GroupType;
-
-public class TupleRecordMaterializer extends RecordMaterializer<Tuple> {
-
- private TupleConverter root;
-
- public TupleRecordMaterializer(GroupType parquetSchema) {
- this.root = new TupleConverter(parquetSchema);
- }
-
- @Override
- public Tuple getCurrentRecord() {
- return root.getCurrentTuple();
- }
-
- @Override
- public GroupConverter getRootConverter() {
- return root;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
index 841314c..e0f33e1 100644
--- a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
+++ b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
@@ -58,8 +58,9 @@ import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.Map;
+@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x
public class TestParquetTBaseScheme {
- final String txtInputPath = "src/test/resources/names.txt";
+ final String txtInputPath = "target/test-classes/names.txt";
final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in";
final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out";
final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out";
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
deleted file mode 100644
index de350dd..0000000
--- a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.cascading;
-
-import cascading.flow.Flow;
-import cascading.flow.FlowProcess;
-import cascading.flow.hadoop.HadoopFlowConnector;
-import cascading.operation.BaseOperation;
-import cascading.operation.Function;
-import cascading.operation.FunctionCall;
-import cascading.pipe.Each;
-import cascading.pipe.Pipe;
-import cascading.scheme.Scheme;
-import cascading.scheme.hadoop.TextLine;
-import cascading.tap.Tap;
-import cascading.tap.hadoop.Hfs;
-import cascading.tuple.Fields;
-import cascading.tuple.Tuple;
-import cascading.tuple.TupleEntry;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.junit.Test;
-import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
-import org.apache.parquet.hadoop.util.ContextUtil;
-import org.apache.parquet.thrift.test.Name;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestParquetTupleScheme {
- final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in";
- final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out";
-
- @Test
- public void testReadPattern() throws Exception {
- String sourceFolder = parquetInputPath;
- testReadWrite(sourceFolder);
-
- String sourceGlobPattern = parquetInputPath + "/*";
- testReadWrite(sourceGlobPattern);
-
- String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*";
- testReadWrite(multiLevelGlobPattern);
- }
-
- @Test
- public void testFieldProjection() throws Exception {
- createFileForRead();
-
- Path path = new Path(txtOutputPath);
- final FileSystem fs = path.getFileSystem(new Configuration());
- if (fs.exists(path)) fs.delete(path, true);
-
- Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name"));
- Tap source = new Hfs(sourceScheme, parquetInputPath);
-
- Scheme sinkScheme = new TextLine(new Fields("last_name"));
- Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
- Pipe assembly = new Pipe("namecp");
- assembly = new Each(assembly, new ProjectedTupleFunction());
- Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
- flow.complete();
- String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
- assertEquals("Practice\nHope\nHorse\n", result);
- }
-
- public void testReadWrite(String inputPath) throws Exception {
- createFileForRead();
-
- Path path = new Path(txtOutputPath);
- final FileSystem fs = path.getFileSystem(new Configuration());
- if (fs.exists(path)) fs.delete(path, true);
-
- Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name"));
- Tap source = new Hfs(sourceScheme, inputPath);
-
- Scheme sinkScheme = new TextLine(new Fields("first", "last"));
- Tap sink = new Hfs(sinkScheme, txtOutputPath);
-
- Pipe assembly = new Pipe("namecp");
- assembly = new Each(assembly, new UnpackTupleFunction());
- Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly);
-
- flow.complete();
- String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000"));
- assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result);
- }
-
- private void createFileForRead() throws Exception {
- final Path fileToCreate = new Path(parquetInputPath + "/names.parquet");
-
- final Configuration conf = new Configuration();
- final FileSystem fs = fileToCreate.getFileSystem(conf);
- if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true);
-
- TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
- TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
- ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class);
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
- Name n1 = new Name();
- n1.setFirst_name("Alice");
- n1.setLast_name("Practice");
- Name n2 = new Name();
- n2.setFirst_name("Bob");
- n2.setLast_name("Hope");
- Name n3 = new Name();
- n3.setFirst_name("Charlie");
- n3.setLast_name("Horse");
-
- n1.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- baos.reset();
- n2.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- baos.reset();
- n3.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- w.close();
- }
-
- private static class UnpackTupleFunction extends BaseOperation implements Function {
- @Override
- public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
- TupleEntry arguments = functionCall.getArguments();
- Tuple result = new Tuple();
-
- Tuple name = new Tuple();
- name.addString(arguments.getString(0));
- name.addString(arguments.getString(1));
-
- result.add(name);
- functionCall.getOutputCollector().add(result);
- }
- }
-
- private static class ProjectedTupleFunction extends BaseOperation implements Function {
- @Override
- public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
- TupleEntry arguments = functionCall.getArguments();
- Tuple result = new Tuple();
-
- Tuple name = new Tuple();
- name.addString(arguments.getString(0));
-// name.addString(arguments.getString(1));
-
- result.add(name);
- functionCall.getOutputCollector().add(result);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/resources/names.txt
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/resources/names.txt b/parquet-cascading/src/test/resources/names.txt
deleted file mode 100644
index e2d0408..0000000
--- a/parquet-cascading/src/test/resources/names.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-Alice Practive
-Bob Hope
-Charlie Horse
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading/src/test/thrift/test.thrift
----------------------------------------------------------------------
diff --git a/parquet-cascading/src/test/thrift/test.thrift b/parquet-cascading/src/test/thrift/test.thrift
deleted file mode 100644
index c58843d..0000000
--- a/parquet-cascading/src/test/thrift/test.thrift
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace java org.apache.parquet.thrift.test
-
-struct Name {
- 1: required string first_name,
- 2: optional string last_name
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/REVIEWERS.md
----------------------------------------------------------------------
diff --git a/parquet-cascading3/REVIEWERS.md b/parquet-cascading3/REVIEWERS.md
new file mode 100644
index 0000000..f797235
--- /dev/null
+++ b/parquet-cascading3/REVIEWERS.md
@@ -0,0 +1,27 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+The following reviewers had reviewed the parquet-cascading (pre-Cascading 3.0) project:
+
+| Name | Apache Id | github id |
+|--------------------|------------|-------------|
+| Dmitriy Ryaboy | dvryaboy | dvryaboy |
+| Tianshuo Deng | tianshuo | tsdeng |
+
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml
new file mode 100644
index 0000000..ea552ad
--- /dev/null
+++ b/parquet-cascading3/pom.xml
@@ -0,0 +1,178 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet</artifactId>
+ <relativePath>../pom.xml</relativePath>
+ <version>1.8.2-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>parquet-cascading3</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Parquet Cascading (for Cascading 3.0 onwards)</name>
+ <url>https://parquet.apache.org</url>
+
+ <repositories>
+ <repository>
+ <id>conjars.org</id>
+ <url>http://conjars.org/repo</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-thrift</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>cascading</groupId>
+ <artifactId>cascading-hadoop</artifactId> <!-- building against cascading-hadoop for Hadoop1, but will use against any backend -->
+ <version>${cascading3.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- TEMPORARY UNTIL AFTER previous.version >= 1.8.2
+
+ (enforcer checks against the API in 1.7.0, this module did not exist back then, therefore it can't succeed)
+ -->
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <!-- /TEMPORARY -->
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>../parquet-cascading-common23/src/main/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>../parquet-cascading-common23/src/test/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-test-resource</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>add-test-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+ <directory>../parquet-cascading-common23/src/test/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.thrift.tools</groupId>
+ <artifactId>maven-thrift-plugin</artifactId>
+ <version>0.1.10</version>
+ <configuration>
+ <thriftExecutable>${thrift.executable}</thriftExecutable>
+ <thriftSourceRoot>../parquet-cascading-common23/src/main/thrift</thriftSourceRoot>
+ <thriftTestSourceRoot>../parquet-cascading-common23/src/test/thrift</thriftTestSourceRoot>
+ </configuration>
+ <executions>
+ <execution>
+ <id>thrift-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
new file mode 100644
index 0000000..af04b47
--- /dev/null
+++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.cascading;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.thrift.TBase;
+
+import cascading.flow.FlowProcess;
+import cascading.tap.Tap;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
+import org.apache.parquet.hadoop.thrift.TBaseWriteSupport;
+import org.apache.parquet.thrift.TBaseRecordConverter;
+
+public class ParquetTBaseScheme<T extends TBase<?,?>> extends ParquetValueScheme<T> {
+
+ // In the case of reads, we can read the thrift class from the file metadata
+ public ParquetTBaseScheme() {
+ this(new Config<T>());
+ }
+
+ public ParquetTBaseScheme(Class<T> thriftClass) {
+ this(new Config<T>().withRecordClass(thriftClass));
+ }
+
+ public ParquetTBaseScheme(FilterPredicate filterPredicate) {
+ this(new Config<T>().withFilterPredicate(filterPredicate));
+ }
+
+ public ParquetTBaseScheme(FilterPredicate filterPredicate, Class<T> thriftClass) {
+ this(new Config<T>().withRecordClass(thriftClass).withFilterPredicate(filterPredicate));
+ }
+
+ public ParquetTBaseScheme(Config<T> config) {
+ super(config);
+ }
+
+ @Override
+ public void sourceConfInit(FlowProcess<? extends JobConf> fp,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+ super.sourceConfInit(fp, tap, jobConf);
+ jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
+ ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class);
+ ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class);
+ }
+
+ @Override
+ public void sinkConfInit(FlowProcess<? extends JobConf> fp,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+
+ if (this.config.getKlass() == null) {
+ throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor");
+ }
+
+ DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
+ DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class);
+ TBaseWriteSupport.<T>setThriftClass(jobConf, this.config.getKlass());
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/57694790/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
----------------------------------------------------------------------
diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
new file mode 100644
index 0000000..4532d3b
--- /dev/null
+++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.parquet.cascading;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+
+import cascading.flow.FlowProcess;
+import cascading.scheme.Scheme;
+import cascading.scheme.SinkCall;
+import cascading.scheme.SourceCall;
+import cascading.tap.CompositeTap;
+import cascading.tap.Tap;
+import cascading.tap.TapException;
+import cascading.tap.hadoop.Hfs;
+import cascading.tuple.Fields;
+import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.mapred.Container;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat;
+import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
+import org.apache.parquet.schema.MessageType;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * A Cascading Scheme that converts Parquet groups into Cascading tuples.
+ * If you provide it with sourceFields, it will selectively materialize only the columns for those fields.
+ * The names must match the names in the Parquet schema.
+ * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the
+ * Parquet schema.
+ * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be
+ * flattened to a top-level field in the Cascading tuple.
+ *
+ * @author Avi Bryant
+ */
+
+public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
+
+ private static final long serialVersionUID = 0L;
+ private String parquetSchema;
+ private final FilterPredicate filterPredicate;
+
+ public ParquetTupleScheme() {
+ super();
+ this.filterPredicate = null;
+ }
+
+ public ParquetTupleScheme(Fields sourceFields) {
+ super(sourceFields);
+ this.filterPredicate = null;
+ }
+
+ public ParquetTupleScheme(FilterPredicate filterPredicate) {
+ this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+ }
+
+ public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) {
+ super(sourceFields);
+ this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+ }
+
+ /**
+ * ParquetTupleScheme constructor used a sink need to be implemented
+ *
+ * @param sourceFields used for the reading step
+ * @param sinkFields used for the writing step
+ * @param schema is mandatory if you add sinkFields and needs to be the
+ * toString() from a MessageType. This value is going to be parsed when the
+ * parquet file will be created.
+ */
+ public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) {
+ super(sourceFields, sinkFields);
+ parquetSchema = schema;
+ this.filterPredicate = null;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void sourceConfInit(FlowProcess<? extends JobConf> fp,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+
+ if (filterPredicate != null) {
+ ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate);
+ }
+
+ jobConf.setInputFormat(DeprecatedParquetInputFormat.class);
+ ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class);
+ TupleReadSupport.setRequestedFields(jobConf, getSourceFields());
+ }
+
+ @Override
+ public Fields retrieveSourceFields(FlowProcess<? extends JobConf> flowProcess, Tap tap) {
+ MessageType schema = readSchema(flowProcess, tap);
+ SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields());
+
+ setSourceFields(intersection.getSourceFields());
+
+ return getSourceFields();
+ }
+
+ private MessageType readSchema(FlowProcess<? extends JobConf> flowProcess, Tap tap) {
+ try {
+ Hfs hfs;
+
+ if( tap instanceof CompositeTap )
+ hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next();
+ else
+ hfs = (Hfs) tap;
+
+ List<Footer> footers = getFooters(flowProcess, hfs);
+
+ if(footers.isEmpty()) {
+ throw new TapException("Could not read Parquet metadata at " + hfs.getPath());
+ } else {
+ return footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+ }
+ } catch (IOException e) {
+ throw new TapException(e);
+ }
+ }
+
+ private List<Footer> getFooters(FlowProcess<? extends JobConf> flowProcess, Hfs hfs) throws IOException {
+ JobConf jobConf = flowProcess.getConfigCopy();
+ DeprecatedParquetInputFormat format = new DeprecatedParquetInputFormat();
+ format.addInputPath(jobConf, hfs.getPath());
+ return format.getFooters(jobConf);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean source(FlowProcess<? extends JobConf> fp, SourceCall<Object[], RecordReader> sc)
+ throws IOException {
+ Container<Tuple> value = (Container<Tuple>) sc.getInput().createValue();
+ boolean hasNext = sc.getInput().next(null, value);
+ if (!hasNext) { return false; }
+
+ // Skip nulls
+ if (value == null) { return true; }
+
+ sc.getIncomingEntry().setTuple(value.get());
+ return true;
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void sinkConfInit(FlowProcess<? extends JobConf> fp,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+ DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf);
+ jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
+ ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
+ }
+
+ @Override
+ public boolean isSink() {
+ return parquetSchema != null;
+ }
+
+ @Override
+ public void sink(FlowProcess<? extends JobConf> fp, SinkCall<Object[], OutputCollector> sink)
+ throws IOException {
+ TupleEntry tuple = sink.getOutgoingEntry();
+ OutputCollector outputCollector = sink.getOutput();
+ outputCollector.collect(null, tuple);
+ }
+}