You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/10/05 09:40:00 UTC
[parquet-mr] branch master updated: PARQUET-313: Implement 3 level
list writing rule for Parquet-Thrift (#222)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 0a4e3ee PARQUET-313: Implement 3 level list writing rule for Parquet-Thrift (#222)
0a4e3ee is described below
commit 0a4e3eea991f7588c9c5e056e9d7b32a76eed5da
Author: Ashish Singh <as...@pinterest.com>
AuthorDate: Mon Oct 5 02:39:52 2020 -0700
PARQUET-313: Implement 3 level list writing rule for Parquet-Thrift (#222)
---
.../apache/parquet/scrooge/ScroogeReadSupport.java | 16 +-
parquet-thrift/README.md | 33 ++++
.../hadoop/thrift/AbstractThriftWriteSupport.java | 9 +-
.../thrift/ParquetThriftBytesOutputFormat.java | 41 ++++-
.../parquet/hadoop/thrift/TBaseWriteSupport.java | 6 +-
.../hadoop/thrift/ThriftBytesWriteSupport.java | 37 ++++-
.../parquet/hadoop/thrift/ThriftReadSupport.java | 22 ++-
.../hadoop/thrift/ThriftToParquetFileWriter.java | 4 +-
.../parquet/thrift/ParquetWriteProtocol.java | 137 +++++++++++++---
.../parquet/thrift/ThriftSchemaConvertVisitor.java | 44 +++++-
.../parquet/thrift/ThriftSchemaConverter.java | 25 ++-
.../thrift/TestThriftToParquetFileWriter.java | 83 ++++++++--
.../parquet/thrift/TestParquetWriteProtocol.java | 176 ++++++++++++++++++++-
.../thrift/TestThriftParquetReaderWriter.java | 16 +-
14 files changed, 567 insertions(+), 82 deletions(-)
diff --git a/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeReadSupport.java b/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeReadSupport.java
index 63b9897..286ca0a 100644
--- a/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeReadSupport.java
+++ b/parquet-scrooge/src/main/java/org/apache/parquet/scrooge/ScroogeReadSupport.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.scrooge;
+import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.thrift.ThriftSchemaConverter;
@@ -37,8 +38,13 @@ public class ScroogeReadSupport extends ThriftReadSupport{
}
@Override
- protected MessageType getProjectedSchema(FieldProjectionFilter fieldProjectionFilter) {
+ protected MessageType getProjectedSchema(Configuration configuration, FieldProjectionFilter fieldProjectionFilter) {
ThriftType.StructType thriftStruct = new ScroogeStructConverter().convert(thriftClass);
- return new ThriftSchemaConverter(fieldProjectionFilter).convert(thriftStruct);
+ return new ThriftSchemaConverter(configuration, fieldProjectionFilter).convert(thriftStruct);
+ }
+
+ @Deprecated
+ protected MessageType getProjectedSchema(FieldProjectionFilter fieldProjectionFilter) {
+ return getProjectedSchema(new Configuration(), fieldProjectionFilter);
}
}
diff --git a/parquet-thrift/README.md b/parquet-thrift/README.md
new file mode 100644
index 0000000..87713c8
--- /dev/null
+++ b/parquet-thrift/README.md
@@ -0,0 +1,33 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+# Apache Thrift Integration
+
+**Todo:** Add a description and examples on how to use Parquet-Thrift integration.
+
+## Available options via Hadoop Configuration
+
+### Configuration for reading
+**Todo:** Add read configs
+
+### Configuration for writing
+**Todo:** Add all write configs
+| Name | Type | Description |
+|-----------------------------------------|-----------|----------------------------------------------------------------------|
+| `parquet.thrift.write-three-level-lists`| `boolean` | Write lists with 3-level structure to allow list and list elements to be nullable. When set to `true`, lists will be written as per https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists|
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
index d0f9d01..0c3fe44 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/AbstractThriftWriteSupport.java
@@ -40,8 +40,10 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class";
private static final Logger LOG = LoggerFactory.getLogger(AbstractThriftWriteSupport.class);
+ private static Configuration conf;
public static void setGenericThriftClass(Configuration configuration, Class<?> thriftClass) {
+ conf = configuration;
configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName());
}
@@ -84,7 +86,8 @@ public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
this.thriftClass = thriftClass;
this.thriftStruct = getThriftStruct();
- this.schema = ThriftSchemaConverter.convertWithoutProjection(thriftStruct);
+ ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter(conf);
+ this.schema = thriftSchemaConverter.convert(thriftStruct);
final Map<String, String> extraMetaData = new ThriftMetaData(thriftClass.getName(), thriftStruct).toExtraMetaData();
// adding the Pig schema as it would have been mapped from thrift
@@ -108,6 +111,7 @@ public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
@Override
public WriteContext init(Configuration configuration) {
+ conf = configuration;
if (writeContext == null) {
init(getGenericThriftClass(configuration));
}
@@ -117,7 +121,8 @@ public abstract class AbstractThriftWriteSupport<T> extends WriteSupport<T> {
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
- this.parquetWriteProtocol = new ParquetWriteProtocol(recordConsumer, columnIO, thriftStruct);
+ this.parquetWriteProtocol = new ParquetWriteProtocol(
+ conf, recordConsumer, columnIO, thriftStruct);
}
protected abstract StructType getThriftStruct();
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
index dfdb140..9caca5a 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ParquetThriftBytesOutputFormat.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop.thrift;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.thrift.TBase;
@@ -55,15 +56,41 @@ public class ParquetThriftBytesOutputFormat extends ParquetOutputFormat<BytesWri
/**
* The buffered implementation will buffer each record and deal with invalid records (more expansive).
* when catching an exception the record can be discarded.
- * The non-buffered implementation will stream field by field. Exceptions are unrecoverable and the file must be closed when an invalid record is written.
+ * The non-buffered implementation will stream field by field. Exceptions are unrecoverable and the file
+ * must be closed when an invalid record is written.
*
+ * @param configuration configuration
* @param protocolFactory the protocol factory to use to read the bytes
- * @param thriftClass thriftClass the class to exctract the schema from
+ * @param thriftClass thriftClass the class to extract the schema from
* @param buffered whether we should buffer each record
* @param errorHandler handle record corruption and schema incompatible exception
*/
- public ParquetThriftBytesOutputFormat(TProtocolFactory protocolFactory, Class<? extends TBase<?, ?>> thriftClass, boolean buffered, FieldIgnoredHandler errorHandler) {
- super(new ThriftBytesWriteSupport(protocolFactory, thriftClass, buffered, errorHandler));
+ public ParquetThriftBytesOutputFormat(Configuration configuration,
+ TProtocolFactory protocolFactory,
+ Class<? extends TBase<?, ?>> thriftClass,
+ boolean buffered,
+ FieldIgnoredHandler errorHandler) {
+ super(new ThriftBytesWriteSupport(
+ configuration, protocolFactory, thriftClass, buffered, errorHandler));
+ }
+
+ /**
+ * @deprecated Use @link{ParquetThriftBytesOutputFormat(
+ * Configuration configuration, TProtocolFactory protocolFactory,
+ * {@literal Class<\? extends TBase<\?, ?>>} thriftClass, boolean buffered,
+ * FieldIgnoredHandler errorHandler)} instead.
+ *
+ * @param protocolFactory the protocol factory to use to read the bytes
+ * @param thriftClass thriftClass the class to extract the schema from
+ * @param buffered whether we should buffer each record
+ * @param errorHandler handle record corruption and schema incompatible exception
+ */
+ @Deprecated
+ public ParquetThriftBytesOutputFormat(TProtocolFactory protocolFactory,
+ Class<? extends TBase<?, ?>> thriftClass,
+ boolean buffered,
+ FieldIgnoredHandler errorHandler) {
+ this(new Configuration(), protocolFactory, thriftClass, buffered, errorHandler);
}
}
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
index 56bf299..c1ece9f 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/TBaseWriteSupport.java
@@ -25,7 +25,10 @@ import org.apache.parquet.thrift.struct.ThriftType.StructType;
public class TBaseWriteSupport<T extends TBase<?, ?>> extends AbstractThriftWriteSupport<T> {
+ private static Configuration conf;
+
public static <U extends TBase<?,?>> void setThriftClass(Configuration configuration, Class<U> thriftClass) {
+ conf = configuration;
AbstractThriftWriteSupport.setGenericThriftClass(configuration, thriftClass);
}
@@ -52,7 +55,8 @@ public class TBaseWriteSupport<T extends TBase<?, ?>> extends AbstractThriftWrit
@Override
protected StructType getThriftStruct() {
- return ThriftSchemaConverter.toStructType(thriftClass);
+ ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter(conf);
+ return thriftSchemaConverter.toStructType((Class<TBase<?, ?>>)thriftClass);
}
@Override
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
index 78fe2d7..bdb6578 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftBytesWriteSupport.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,14 +79,34 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
private StructType thriftStruct;
private ParquetWriteProtocol parquetWriteProtocol;
private final FieldIgnoredHandler errorHandler;
+ private Configuration configuration;
public ThriftBytesWriteSupport() {
this.buffered = true;
this.errorHandler = null;
}
- public ThriftBytesWriteSupport(TProtocolFactory protocolFactory, Class<? extends TBase<?, ?>> thriftClass, boolean buffered, FieldIgnoredHandler errorHandler) {
+ /**
+ * @deprecated Use @link{ThriftBytesWriteSupport(Configuration configuration,
+ * TProtocolFactory protocolFactory, {@literal Class<? extends TBase<?,?>>} thriftClass,
+ * boolean buffered, FieldIgnoredHandler errorHandler)} instead
+ */
+ @Deprecated
+ public ThriftBytesWriteSupport(TProtocolFactory protocolFactory,
+ Class<? extends TBase<?, ?>> thriftClass,
+ boolean buffered,
+ FieldIgnoredHandler errorHandler) {
+ this(new Configuration(), protocolFactory, thriftClass, buffered, errorHandler);
+ }
+
+ public ThriftBytesWriteSupport(
+ Configuration configuration,
+ TProtocolFactory protocolFactory,
+ Class<? extends TBase<?, ?>> thriftClass,
+ boolean buffered,
+ FieldIgnoredHandler errorHandler) {
super();
+ this.configuration = configuration;
this.protocolFactory = protocolFactory;
this.thriftClass = thriftClass;
this.buffered = buffered;
@@ -103,6 +123,7 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
@Override
public WriteContext init(Configuration configuration) {
+ this.configuration = configuration;
if (this.protocolFactory == null) {
try {
this.protocolFactory = getTProtocolFactoryClass(configuration).newInstance();
@@ -115,8 +136,9 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
} else {
thriftClass = TBaseWriteSupport.getThriftClass(configuration);
}
- this.thriftStruct = ThriftSchemaConverter.toStructType(thriftClass);
- this.schema = ThriftSchemaConverter.convertWithoutProjection(thriftStruct);
+ ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter(this.configuration);
+ thriftStruct = thriftSchemaConverter.toStructType(thriftClass);
+ schema = thriftSchemaConverter.convert(thriftStruct);
if (buffered) {
readToWrite = new BufferedProtocolReadToWrite(thriftStruct, errorHandler);
} else {
@@ -156,7 +178,8 @@ public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
- this.parquetWriteProtocol = new ParquetWriteProtocol(recordConsumer, columnIO, thriftStruct);
+ parquetWriteProtocol = new ParquetWriteProtocol(
+ configuration, recordConsumer, columnIO, thriftStruct);
thriftWriteSupport.prepareForWrite(recordConsumer);
}
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
index 9b6881e..6bad970 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -178,7 +178,7 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
} else if (projectionFilter != null) {
try {
initThriftClassFromMultipleFiles(context.getKeyValueMetadata(), configuration);
- requestedProjection = getProjectedSchema(projectionFilter);
+ requestedProjection = getProjectedSchema(configuration, projectionFilter);
} catch (ClassNotFoundException e) {
throw new ThriftProjectionException("can not find thriftClass from configuration", e);
}
@@ -189,8 +189,18 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
}
@SuppressWarnings("unchecked")
- protected MessageType getProjectedSchema(FieldProjectionFilter fieldProjectionFilter) {
- return new ThriftSchemaConverter(fieldProjectionFilter).convert((Class<TBase<?, ?>>)thriftClass);
+ protected MessageType getProjectedSchema(Configuration configuration, FieldProjectionFilter
+ fieldProjectionFilter) {
+ return new ThriftSchemaConverter(configuration, fieldProjectionFilter)
+ .convert((Class<TBase<?, ?>>)thriftClass);
+ }
+
+ @Deprecated
+ @SuppressWarnings("unchecked")
+ protected MessageType getProjectedSchema(FieldProjectionFilter
+ fieldProjectionFilter) {
+ return new ThriftSchemaConverter(new Configuration(), fieldProjectionFilter)
+ .convert((Class<TBase<?, ?>>)thriftClass);
}
@SuppressWarnings("unchecked")
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftToParquetFileWriter.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftToParquetFileWriter.java
index b0ace04..a638715 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftToParquetFileWriter.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftToParquetFileWriter.java
@@ -93,7 +93,9 @@ public class ThriftToParquetFileWriter implements Closeable {
boolean buffered,
FieldIgnoredHandler errorHandler) throws IOException, InterruptedException {
this.taskAttemptContext = taskAttemptContext;
- this.recordWriter = new ParquetThriftBytesOutputFormat(protocolFactory, thriftClass, buffered, errorHandler).getRecordWriter(taskAttemptContext, fileToCreate);
+ this.recordWriter = new ParquetThriftBytesOutputFormat(
+ taskAttemptContext.getConfiguration(), protocolFactory, thriftClass, buffered, errorHandler)
+ .getRecordWriter(taskAttemptContext, fileToCreate);
}
/**
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
index 8755ee4..ba48b37 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@ package org.apache.parquet.thrift;
import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TField;
import org.apache.thrift.protocol.TList;
@@ -51,6 +52,11 @@ import org.slf4j.LoggerFactory;
public class ParquetWriteProtocol extends ParquetProtocol {
+ public static final String WRITE_THREE_LEVEL_LISTS = "parquet.thrift.write-three-level-lists";
+ public static final boolean WRITE_THREE_LEVEL_LISTS_DEFAULT = false;
+
+ private boolean writeThreeLevelList = WRITE_THREE_LEVEL_LISTS_DEFAULT;
+
interface Events {
public void start();
@@ -98,13 +104,46 @@ public class ParquetWriteProtocol extends ParquetProtocol {
}
- class ListWriteProtocol extends FieldBaseWriteProtocol {
+ abstract class ListWriteProtocol extends FieldBaseWriteProtocol {
+ protected int size;
+
+ public ListWriteProtocol(Events returnClause) {
+ super(returnClause);
+ }
+
+ abstract protected void startListWrapper();
+ abstract protected void endListWrapper();
+
+ @Override
+ public void writeListBegin(TList list) throws TException {
+ size = list.size;
+ startListWrapper();
+ }
+
+ @Override
+ public void writeListEnd() throws TException {
+ endListWrapper();
+ }
+
+ @Override
+ public void writeSetBegin(TSet set) throws TException {
+ size = set.size;
+ startListWrapper();
+ }
+
+ @Override
+ public void writeSetEnd() throws TException {
+ endListWrapper();
+ }
+
+ }
+
+ class TwoLevelListWriteProtocol extends ListWriteProtocol {
private ColumnIO listContent;
private TProtocol contentProtocol;
- private int size;
- public ListWriteProtocol(GroupColumnIO columnIO, ThriftField values, Events returnClause) {
+ public TwoLevelListWriteProtocol(GroupColumnIO columnIO, ThriftField values, Events returnClause) {
super(returnClause);
this.listContent = columnIO.getChild(0);
this.contentProtocol = getProtocol(values, listContent, new Events() {
@@ -117,14 +156,15 @@ public class ParquetWriteProtocol extends ParquetProtocol {
public void end() {
++ consumedRecords;
if (consumedRecords == size) {
- currentProtocol = ListWriteProtocol.this;
+ currentProtocol = TwoLevelListWriteProtocol.this;
consumedRecords = 0;
}
}
});
}
- private void startListWrapper() {
+ @Override
+ protected void startListWrapper() {
start();
recordConsumer.startGroup();
if (size > 0) {
@@ -133,36 +173,67 @@ public class ParquetWriteProtocol extends ParquetProtocol {
}
}
- private void endListWrapper() {
+ @Override
+ protected void endListWrapper() {
if (size > 0) {
recordConsumer.endField(listContent.getType().getName(), 0);
}
recordConsumer.endGroup();
end();
}
+ }
- @Override
- public void writeListBegin(TList list) throws TException {
- size = list.size;
- startListWrapper();
- }
+ class ThreeLevelListWriteProtocol extends ListWriteProtocol {
+ private GroupColumnIO listContent;
+ private ColumnIO elementContent;
+ private TProtocol contentProtocol;
- @Override
- public void writeListEnd() throws TException {
- endListWrapper();
+ public ThreeLevelListWriteProtocol(GroupColumnIO columnIO, ThriftField values,
+ Events returnClause) {
+ super(returnClause);
+ this.listContent = (GroupColumnIO) columnIO.getChild(0);
+ this.elementContent = listContent.getChild(0);
+ this.contentProtocol = getProtocol(values, elementContent, new Events() {
+ int consumedRecords = 0;
+ @Override
+ public void start() {
+ recordConsumer.startGroup();
+ recordConsumer.startField("element", 0);
+ }
+
+ @Override
+ public void end() {
+ recordConsumer.endField("element", 0);
+ recordConsumer.endGroup();
+ ++ consumedRecords;
+ if (consumedRecords == size) {
+ currentProtocol = ThreeLevelListWriteProtocol.this;
+ consumedRecords = 0;
+ } else {
+ currentProtocol = contentProtocol;
+ }
+ }
+ });
}
@Override
- public void writeSetBegin(TSet set) throws TException {
- size = set.size;
- startListWrapper();
+ protected void startListWrapper() {
+ start();
+ recordConsumer.startGroup();
+ if (size > 0) {
+ recordConsumer.startField("list", 0);
+ currentProtocol = contentProtocol;
+ }
}
@Override
- public void writeSetEnd() throws TException {
- endListWrapper();
+ protected void endListWrapper() {
+ if (size > 0) {
+ recordConsumer.endField("list", 0);
+ }
+ recordConsumer.endGroup();
+ end();
}
-
}
class MapWriteProtocol extends FieldBaseWriteProtocol {
@@ -427,6 +498,16 @@ public class ParquetWriteProtocol extends ParquetProtocol {
return "<TMap keyType:" + map.keyType + " valueType:" + map.valueType + " size:" + map.size + ">";
}
+ public ParquetWriteProtocol(
+ Configuration configuration, RecordConsumer recordConsumer, MessageColumnIO schema, StructType thriftType) {
+ this.recordConsumer = recordConsumer;
+ if (configuration != null) {
+ this.writeThreeLevelList = configuration.getBoolean(
+ WRITE_THREE_LEVEL_LISTS, WRITE_THREE_LEVEL_LISTS_DEFAULT);
+ }
+ this.currentProtocol = new MessageWriteProtocol(schema, thriftType);
+ }
+
public ParquetWriteProtocol(RecordConsumer recordConsumer, MessageColumnIO schema, StructType thriftType) {
this.currentProtocol = new MessageWriteProtocol(schema, thriftType);
this.recordConsumer = recordConsumer;
@@ -677,10 +758,10 @@ public class ParquetWriteProtocol extends ParquetProtocol {
p = new MapWriteProtocol((GroupColumnIO)columnIO, (MapType)type, returnClause);
break;
case SET:
- p = new ListWriteProtocol((GroupColumnIO)columnIO, ((SetType)type).getValues(), returnClause);
+ p = getListWriteProtocol((GroupColumnIO) columnIO, ((SetType) type).getValues(), returnClause);
break;
case LIST:
- p = new ListWriteProtocol((GroupColumnIO)columnIO, ((ListType)type).getValues(), returnClause);
+ p = getListWriteProtocol((GroupColumnIO) columnIO, ((ListType) type).getValues(), returnClause);
break;
case ENUM:
p = new EnumWriteProtocol((PrimitiveColumnIO)columnIO, (EnumType)type, returnClause);
@@ -688,4 +769,10 @@ public class ParquetWriteProtocol extends ParquetProtocol {
}
return p;
}
+
+ private TProtocol getListWriteProtocol(GroupColumnIO columnIO, ThriftField values, Events returnClause) {
+ return writeThreeLevelList ?
+ new ThreeLevelListWriteProtocol(columnIO, values, returnClause):
+ new TwoLevelListWriteProtocol(columnIO, values, returnClause);
+ }
}
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
index c256880..83d6dd5 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ShouldNeverHappenException;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -53,6 +54,7 @@ import org.apache.parquet.thrift.struct.ThriftType.StringType;
import org.apache.parquet.thrift.struct.ThriftType.StructType;
import org.apache.parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
+import static org.apache.parquet.schema.ConversionPatterns.listOfElements;
import static org.apache.parquet.schema.ConversionPatterns.listType;
import static org.apache.parquet.schema.ConversionPatterns.mapType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
@@ -76,22 +78,39 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
private final boolean doProjection;
private final boolean keepOneOfEachUnion;
- private ThriftSchemaConvertVisitor(FieldProjectionFilter fieldProjectionFilter, boolean doProjection, boolean keepOneOfEachUnion) {
+ private final boolean writeThreeLevelList;
+
+ private ThriftSchemaConvertVisitor(FieldProjectionFilter fieldProjectionFilter, boolean doProjection,
+ boolean keepOneOfEachUnion, Configuration configuration) {
this.fieldProjectionFilter = Objects.requireNonNull(fieldProjectionFilter,
- "fieldProjectionFilter cannot be null");
+ "fieldProjectionFilter cannot be null");
this.doProjection = doProjection;
this.keepOneOfEachUnion = keepOneOfEachUnion;
+ if (configuration != null) {
+ this.writeThreeLevelList = configuration.getBoolean(
+ ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS,
+ ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS_DEFAULT);
+ } else {
+ writeThreeLevelList = ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS_DEFAULT;
+ }
+ }
+
+ private ThriftSchemaConvertVisitor(FieldProjectionFilter fieldProjectionFilter, boolean doProjection,
+ boolean keepOneOfEachUnion) {
+ this(fieldProjectionFilter, doProjection, keepOneOfEachUnion, new Configuration());
}
@Deprecated
public static MessageType convert(StructType struct, FieldProjectionFilter filter) {
- return convert(struct, filter, true);
+ return convert(struct, filter, true, new Configuration());
}
- public static MessageType convert(StructType struct, FieldProjectionFilter filter, boolean keepOneOfEachUnion) {
+ public static MessageType convert(StructType struct, FieldProjectionFilter filter, boolean keepOneOfEachUnion,
+ Configuration conf) {
State state = new State(new FieldsPath(), REPEATED, "ParquetSchema");
- ConvertedField converted = struct.accept(new ThriftSchemaConvertVisitor(filter, true, keepOneOfEachUnion), state);
+ ConvertedField converted = struct.accept(
+ new ThriftSchemaConvertVisitor(filter, true, keepOneOfEachUnion, conf), state);
if (!converted.isKeep()) {
throw new ThriftProjectionException("No columns have been selected");
@@ -178,7 +197,12 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
}
private ConvertedField visitListLike(ThriftField listLike, State state, boolean isSet) {
- State childState = new State(state.path, REPEATED, state.name + "_tuple");
+ State childState;
+ if (writeThreeLevelList) {
+ childState = new State(state.path, REQUIRED, "element");
+ } else {
+ childState = new State(state.path, REPEATED, state.name + "_tuple");
+ }
ConvertedField converted = listLike.getType().accept(this, childState);
@@ -194,7 +218,13 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
}
}
- return new Keep(state.path, listType(state.repetition, state.name, converted.asKeep().getType()));
+ if (writeThreeLevelList) {
+ return new Keep(
+ state.path, listOfElements(state.repetition, state.name, converted.asKeep().getType()));
+ } else {
+ return new Keep(
+ state.path, listType(state.repetition, state.name, converted.asKeep().getType()));
+ }
}
return new Drop(state.path);
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
index 2cb061b..61cfd4c 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConverter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,8 @@ import com.twitter.elephantbird.thrift.TStructDescriptor;
import com.twitter.elephantbird.thrift.TStructDescriptor.Field;
import java.util.HashSet;
import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TBase;
import org.apache.thrift.TEnum;
import org.apache.thrift.TUnion;
@@ -50,10 +52,23 @@ import static org.apache.parquet.schema.Type.Repetition.REPEATED;
public class ThriftSchemaConverter {
private final FieldProjectionFilter fieldProjectionFilter;
+ private Configuration conf;
+
public ThriftSchemaConverter() {
this(FieldProjectionFilter.ALL_COLUMNS);
}
+ public ThriftSchemaConverter(Configuration configuration) {
+ this();
+ conf = configuration;
+ }
+
+ public ThriftSchemaConverter(
+ Configuration configuration, FieldProjectionFilter fieldProjectionFilter) {
+ this(fieldProjectionFilter);
+ conf = configuration;
+ }
+
public ThriftSchemaConverter(FieldProjectionFilter fieldProjectionFilter) {
this.fieldProjectionFilter = fieldProjectionFilter;
}
@@ -72,7 +87,7 @@ public class ThriftSchemaConverter {
* @return the struct as a Parquet message type
*/
public MessageType convert(StructType struct) {
- MessageType messageType = ThriftSchemaConvertVisitor.convert(struct, fieldProjectionFilter, true);
+ MessageType messageType = ThriftSchemaConvertVisitor.convert(struct, fieldProjectionFilter, true, conf);
fieldProjectionFilter.assertNoUnmatchedPatterns();
return messageType;
}
@@ -85,7 +100,7 @@ public class ThriftSchemaConverter {
* @return the struct as a Parquet message type
*/
public static MessageType convertWithoutProjection(StructType struct) {
- return ThriftSchemaConvertVisitor.convert(struct, FieldProjectionFilter.ALL_COLUMNS, false);
+ return ThriftSchemaConvertVisitor.convert(struct, FieldProjectionFilter.ALL_COLUMNS, false, new Configuration());
}
public static <T extends TBase<?,?>> StructOrUnionType structOrUnionType(Class<T> klass) {
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
index b8ff23c..e96b226 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
@@ -32,6 +32,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.thrift.ParquetWriteProtocol;
import org.apache.parquet.thrift.test.RequiredPrimitiveFixture;
import org.apache.parquet.thrift.test.TestListsInMap;
@@ -79,7 +80,7 @@ public class TestThriftToParquetFileWriter {
"bob.roberts@example.com",
Arrays.asList(new PhoneNumber("1234567890")))));
- final Path fileToCreate = createFile(a);
+ final Path fileToCreate = createFile(new Configuration(), a);
ParquetReader<Group> reader = createRecordReader(fileToCreate);
@@ -109,7 +110,8 @@ public class TestThriftToParquetFileWriter {
boolStats.setMinMax(false, true);
//write rows to a file
- Path p = createFile(new RequiredPrimitiveFixture(false, (byte)32, (short)32, 2, 90l, -15.55d, "as"),
+ Path p = createFile(new Configuration(),
+ new RequiredPrimitiveFixture(false, (byte)32, (short)32, 2, 90l, -15.55d, "as"),
new RequiredPrimitiveFixture(false, (byte)100, (short)100, 100, 287l, -9.0d, "world"),
new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello"));
final Configuration configuration = new Configuration();
@@ -151,7 +153,8 @@ public class TestThriftToParquetFileWriter {
binaryStatsLarge.setMinMax(Binary.fromString("some small string"),
Binary.fromString("some very large string here to test in this function"));
//write rows to a file
- Path p_large = createFile(new RequiredPrimitiveFixture(false, (byte)2, (short)32, -Integer.MAX_VALUE,
+ Path p_large = createFile(new Configuration(),
+ new RequiredPrimitiveFixture(false, (byte)2, (short)32, -Integer.MAX_VALUE,
-Long.MAX_VALUE, -Double.MAX_VALUE, "some small string"),
new RequiredPrimitiveFixture(false, (byte)100, (short)100, Integer.MAX_VALUE,
Long.MAX_VALUE, Double.MAX_VALUE,
@@ -201,7 +204,7 @@ public class TestThriftToParquetFileWriter {
final TestMapInList listMap = new TestMapInList("listmap",
Arrays.asList(map1, map2));
- final Path fileToCreate = createFile(listMap);
+ final Path fileToCreate = createFile(new Configuration(), listMap);
ParquetReader<Group> reader = createRecordReader(fileToCreate);
@@ -221,7 +224,7 @@ public class TestThriftToParquetFileWriter {
Map<String, List<String>> map = new HashMap<String,List<String>>();
map.put("key", Arrays.asList("val1","val2"));
final TestListInMap mapList = new TestListInMap("maplist", map);
- final Path fileToCreate = createFile(mapList);
+ final Path fileToCreate = createFile(new Configuration(), mapList);
ParquetReader<Group> reader = createRecordReader(fileToCreate);
@@ -239,7 +242,7 @@ public class TestThriftToParquetFileWriter {
Map<List<String>, List<String>> map = new HashMap<List<String>,List<String>>();
map.put(Arrays.asList("key1","key2"), Arrays.asList("val1","val2"));
final TestListsInMap mapList = new TestListsInMap("maplists", map);
- final Path fileToCreate = createFile(mapList);
+ final Path fileToCreate = createFile(new Configuration(), mapList);
ParquetReader<Group> reader = createRecordReader(fileToCreate);
@@ -256,6 +259,68 @@ public class TestThriftToParquetFileWriter {
}
}
+ @Test
+ public void testWriteFileWithThreeLevelsList()
+ throws IOException, InterruptedException, TException {
+ final AddressBook a = new AddressBook(
+ Arrays.asList(
+ new Person(
+ new Name("Bob", "Roberts"),
+ 0,
+ "bob.roberts@example.com",
+ Arrays.asList(new PhoneNumber("1234567890")))));
+
+ Configuration conf = new Configuration();
+ conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+
+ final Path fileToCreate = createFile(conf, a);
+
+ ParquetReader<Group> reader = createRecordReader(fileToCreate);
+
+ Group g = null;
+ int i = 0;
+ while((g = reader.read()) != null) {
+ assertEquals(a.persons.size(), g.getFieldRepetitionCount("persons"));
+ assertEquals(
+ a.persons.get(0).email,
+ g.getGroup("persons", 0).getGroup(0, 0).getGroup(0, 0).getString("email", 0));
+ // just some sanity check, we're testing the various layers somewhere else
+ ++i;
+ }
+ assertEquals("read 1 record", 1, i);
+ }
+
+ @Test
+ public void testWriteFileListOfMapWithThreeLevelLists()
+ throws IOException, InterruptedException, TException {
+ Map<String, String> map1 = new HashMap<String,String>();
+ map1.put("key11", "value11");
+ map1.put("key12", "value12");
+ Map<String, String> map2 = new HashMap<String,String>();
+ map2.put("key21", "value21");
+ final TestMapInList listMap = new TestMapInList("listmap",
+ Arrays.asList(map1, map2));
+
+ Configuration conf = new Configuration();
+ conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+
+ final Path fileToCreate = createFile(conf, listMap);
+
+ ParquetReader<Group> reader = createRecordReader(fileToCreate);
+
+ Group g = null;
+ while((g = reader.read()) != null) {
+ assertEquals(listMap.names.size(),
+ g.getGroup("names", 0).getFieldRepetitionCount("list"));
+ assertEquals(listMap.names.get(0).size(),
+ g.getGroup("names", 0).getGroup("list", 0).
+ getGroup("element", 0).getFieldRepetitionCount("key_value"));
+ assertEquals(listMap.names.get(1).size(),
+ g.getGroup("names", 0).getGroup("list", 1).
+ getGroup("element", 0).getFieldRepetitionCount("key_value"));
+ }
+ }
+
private ParquetReader<Group> createRecordReader(Path parquetFilePath) throws IOException {
Configuration configuration = new Configuration(true);
@@ -267,10 +332,10 @@ public class TestThriftToParquetFileWriter {
return new ParquetReader<Group>(parquetFilePath, readSupport);
}
- private <T extends TBase<?,?>> Path createFile(T... tObjs) throws IOException, InterruptedException, TException {
+ private <T extends TBase<?,?>> Path createFile(Configuration conf, T... tObjs)
+ throws IOException, InterruptedException, TException {
final Path fileToCreate = new Path("target/test/TestThriftToParquetFileWriter/"+tObjs[0].getClass()+".parquet");
- LOG.info("File created: {}", fileToCreate.toString());
- Configuration conf = new Configuration();
+ LOG.info("File created: " + fileToCreate.toString());
final FileSystem fs = fileToCreate.getFileSystem(conf);
if (fs.exists(fileToCreate)) {
fs.delete(fileToCreate, true);
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
index f9702c0..1311d76 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
@@ -30,6 +30,9 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import com.twitter.elephantbird.thrift.test.TestMapInList;
+import com.twitter.elephantbird.thrift.test.TestNameSet;
+import org.apache.hadoop.conf.Configuration;
import org.junit.ComparisonFailure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +51,6 @@ import org.apache.parquet.io.RecordConsumerLoggingWrapper;
import org.apache.parquet.pig.PigSchemaConverter;
import org.apache.parquet.pig.TupleWriteSupport;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.thrift.ParquetWriteProtocol;
-import org.apache.parquet.thrift.ThriftSchemaConverter;
import org.apache.parquet.thrift.struct.ThriftType.StructType;
import com.twitter.data.proto.tutorial.thrift.AddressBook;
@@ -518,9 +519,175 @@ public class TestParquetWriteProtocol {
validateThrift(thriftExpectations, a);
}
+ @Test
+ public void testSetWithTwoLevelList() throws TException {
+ final Set<String> names = new HashSet<String>();
+ names.add("John");
+ names.add("Jack");
+ final TestNameSet o = new TestNameSet("name", names);
+
+ String[] expectations = {
+ "startMessage()",
+ "startField(name, 0)",
+ "addBinary(name)",
+ "endField(name, 0)",
+ "startField(names, 1)",
+ "startGroup()",
+ "startField(names_tuple, 0)",
+ "addBinary(John)",
+ "addBinary(Jack)",
+ "endField(names_tuple, 0)",
+ "endGroup()",
+ "endField(names, 1)",
+ "endMessage()"};
+ Configuration conf = new Configuration();
+ conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "false");
+ validateThrift(conf, expectations, o);
+ }
+
+ @Test
+ public void testSetWithThreeLevelList() throws TException {
+ final Set<String> names = new HashSet<String>();
+ names.add("John");
+ names.add("Jack");
+ final TestNameSet o = new TestNameSet("name", names);
+
+ String[] expectations = {
+ "startMessage()",
+ "startField(name, 0)",
+ "addBinary(name)",
+ "endField(name, 0)",
+ "startField(names, 1)",
+ "startGroup()",
+ "startField(list, 0)",
+ "startGroup()",
+ "startField(element, 0)",
+ "addBinary(John)",
+ "endField(element, 0)",
+ "endGroup()",
+ "startGroup()",
+ "startField(element, 0)",
+ "addBinary(Jack)",
+ "endField(element, 0)",
+ "endGroup()",
+ "endField(list, 0)",
+ "endGroup()",
+ "endField(names, 1)",
+ "endMessage()"};
+ Configuration conf = new Configuration();
+ conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+ validateThrift(conf, expectations, o);
+ }
+
+ @Test
+ public void testNameThreeLevelList() throws TException {
+ final List<String> names = new ArrayList<String>();
+ names.add("John");
+ names.add("Jack");
+ final TestNameList o = new TestNameList("name", names);
+
+ String[] expectations = {
+ "startMessage()",
+ "startField(name, 0)",
+ "addBinary(name)",
+ "endField(name, 0)",
+ "startField(names, 1)",
+ "startGroup()",
+ "startField(list, 0)",
+ "startGroup()",
+ "startField(element, 0)",
+ "addBinary(John)",
+ "endField(element, 0)",
+ "endGroup()",
+ "startGroup()",
+ "startField(element, 0)",
+ "addBinary(Jack)",
+ "endField(element, 0)",
+ "endGroup()",
+ "endField(list, 0)",
+ "endGroup()",
+ "endField(names, 1)",
+ "endMessage()"};
+ Configuration conf = new Configuration();
+ conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+ validateThrift(conf, expectations, o);
+ }
+
+ @Test
+ public void testListOfMapThreeLevelList() throws TException {
+ Map<String, String> map1 = new HashMap<String,String>();
+ map1.put("key11", "value11");
+ map1.put("key12", "value12");
+ Map<String, String> map2 = new HashMap<String,String>();
+ map2.put("key21", "value21");
+ final TestMapInList listMap = new TestMapInList("listmap",
+ Arrays.asList(map1, map2));
+
+ String[] expectations = {
+ "startMessage()",
+ "startField(name, 0)",
+ "addBinary(listmap)",
+ "endField(name, 0)",
+ "startField(names, 1)",
+ "startGroup()",
+ "startField(list, 0)",
+ "startGroup()",
+ "startField(element, 0)",
+ "startGroup()",
+ "startField(key_value, 0)",
+ "startGroup()",
+ "startField(key, 0)",
+ "addBinary(key12)",
+ "endField(key, 0)",
+ "startField(value, 1)",
+ "addBinary(value12)",
+ "endField(value, 1)",
+ "endGroup()",
+ "startGroup()",
+ "startField(key, 0)",
+ "addBinary(key11)",
+ "endField(key, 0)",
+ "startField(value, 1)",
+ "addBinary(value11)",
+ "endField(value, 1)",
+ "endGroup()",
+ "endField(key_value, 0)",
+ "endGroup()",
+ "endField(element, 0)",
+ "endGroup()",
+ "startGroup()",
+ "startField(element, 0)",
+ "startGroup()",
+ "startField(key_value, 0)",
+ "startGroup()",
+ "startField(key, 0)",
+ "addBinary(key21)",
+ "endField(key, 0)",
+ "startField(value, 1)",
+ "addBinary(value21)",
+ "endField(value, 1)",
+ "endGroup()",
+ "endField(key_value, 0)",
+ "endGroup()",
+ "endField(element, 0)",
+ "endGroup()",
+ "endField(list, 0)",
+ "endGroup()",
+ "endField(names, 1)",
+ "endMessage()"};
+ Configuration conf = new Configuration();
+ conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
+ validateThrift(conf, expectations, listMap);
+ }
+
private void validateThrift(String[] expectations, TBase<?, ?> a)
throws TException {
- final ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter();
+ validateThrift(new Configuration(), expectations, a);
+ }
+
+ private void validateThrift(Configuration configuration, String[] expectations, TBase<?, ?> a)
+ throws TException {
+ final ThriftSchemaConverter thriftSchemaConverter = new ThriftSchemaConverter(configuration);
// System.out.println(a);
final Class<TBase<?,?>> class1 = (Class<TBase<?,?>>)a.getClass();
final MessageType schema = thriftSchemaConverter.convert(class1);
@@ -528,7 +695,8 @@ public class TestParquetWriteProtocol {
final StructType structType = thriftSchemaConverter.toStructType(class1);
ExpectationValidatingRecordConsumer recordConsumer = new ExpectationValidatingRecordConsumer(new ArrayDeque<String>(Arrays.asList(expectations)));
final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
- ParquetWriteProtocol p = new ParquetWriteProtocol(new RecordConsumerLoggingWrapper(recordConsumer), columnIO, structType);
+ ParquetWriteProtocol p = new ParquetWriteProtocol(
+ configuration, new RecordConsumerLoggingWrapper(recordConsumer), columnIO, structType);
a.write(p);
}
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftParquetReaderWriter.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftParquetReaderWriter.java
index e23aad7..cfd1a5c 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftParquetReaderWriter.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestThriftParquetReaderWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,7 +39,17 @@ public class TestThriftParquetReaderWriter {
@Test
public void testWriteRead() throws IOException {
+ readWriteTest(true);
+ }
+
+ @Test
+ public void testWriteReadTwoLevelList() throws IOException {
+ readWriteTest(false);
+ }
+
+ private void readWriteTest(Boolean useThreeLevelLists) throws IOException {
Configuration configuration = new Configuration();
+ configuration.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, useThreeLevelLists.toString());
Path f = new Path("target/test/TestThriftParquetReaderWriter");
FileSystem fs = f.getFileSystem(configuration);
if (fs.exists(f)) {