You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by al...@apache.org on 2015/03/06 00:22:21 UTC
incubator-parquet-mr git commit: PARQUET-162: ParquetThrift should
throw when unrecognized columns are passed to the column projection API
Repository: incubator-parquet-mr
Updated Branches:
refs/heads/master 998d6507e -> 258349426
PARQUET-162: ParquetThrift should throw when unrecognized columns are passed to the column projection API
ParquetThrift should throw when unrecognized columns are passed to the column projection API
Author: Tianshuo Deng <td...@twitter.com>
Closes #123 from tsdeng/throw_when_projection_filter_matches_nothing and squashes the following commits:
12c08da [Tianshuo Deng] make PathGlobPatternStatus static
4360b36 [Tianshuo Deng] fix tests
a74f621 [Tianshuo Deng] clean up test
3c581f3 [Tianshuo Deng] refactor unit test
6a86de7 [Tianshuo Deng] format
bdc625d [Tianshuo Deng] throw when projection filter matches nothing
Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/25834942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/25834942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/25834942
Branch: refs/heads/master
Commit: 258349426eecfbe5c135f91809bae80e60c6db6a
Parents: 998d650
Author: Tianshuo Deng <td...@twitter.com>
Authored: Thu Mar 5 15:22:03 2015 -0800
Committer: Alex Levenson <al...@twitter.com>
Committed: Thu Mar 5 15:22:03 2015 -0800
----------------------------------------------------------------------
.../thrift/ThriftSchemaConvertVisitor.java | 4 +
.../parquet/thrift/ThriftSchemaConverter.java | 12 +-
.../projection/FieldProjectionFilter.java | 46 +++++--
.../thrift/projection/PathGlobPattern.java | 5 +
...stParquetToThriftReadWriteAndProjection.java | 119 ++++++++++---------
.../thrift/TestThriftSchemaConverter.java | 25 +++-
6 files changed, 140 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/25834942/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConvertVisitor.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConvertVisitor.java
index c89cbb6..93624ab 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConvertVisitor.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -56,6 +56,10 @@ import parquet.thrift.struct.ThriftType;
*/
public class ThriftSchemaConvertVisitor implements ThriftType.TypeVisitor {
+ public FieldProjectionFilter getFieldProjectionFilter() {
+ return fieldProjectionFilter;
+ }
+
FieldProjectionFilter fieldProjectionFilter;
Type currentType;
FieldsPath currentFieldPath = new FieldsPath();
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/25834942/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java
index 87ef3c1..c03f93e 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java
@@ -22,8 +22,10 @@ import com.twitter.elephantbird.thrift.TStructDescriptor;
import com.twitter.elephantbird.thrift.TStructDescriptor.Field;
import org.apache.thrift.TBase;
import org.apache.thrift.TEnum;
-import parquet.schema.*;
+import parquet.schema.MessageType;
import parquet.thrift.projection.FieldProjectionFilter;
+import parquet.thrift.projection.PathGlobPattern;
+import parquet.thrift.projection.ThriftProjectionException;
import parquet.thrift.struct.ThriftField;
import parquet.thrift.struct.ThriftField.Requirement;
import parquet.thrift.struct.ThriftType;
@@ -58,9 +60,17 @@ public class ThriftSchemaConverter {
ThriftSchemaConvertVisitor visitor = new ThriftSchemaConvertVisitor(fieldProjectionFilter);
thriftClass.accept(visitor);
MessageType convertedMessageType = visitor.getConvertedMessageType();
+ checkUnmatchedProjectionFilter(visitor.getFieldProjectionFilter());
return convertedMessageType;
}
+ private void checkUnmatchedProjectionFilter(FieldProjectionFilter filter) {
+ List<PathGlobPattern> unmatched = filter.getUnMatchedPatterns();
+ if (unmatched.size() != 0) {
+ throw new ThriftProjectionException("unmatched projection filters: " + unmatched.toString());
+ }
+ }
+
public ThriftType.StructType toStructType(Class<? extends TBase<?, ?>> thriftClass) {
return new ThriftStructConverter().toStructType(thriftClass);
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/25834942/parquet-thrift/src/main/java/parquet/thrift/projection/FieldProjectionFilter.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/projection/FieldProjectionFilter.java b/parquet-thrift/src/main/java/parquet/thrift/projection/FieldProjectionFilter.java
index 348663d..ba63258 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/projection/FieldProjectionFilter.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/projection/FieldProjectionFilter.java
@@ -28,21 +28,43 @@ import java.util.List;
*/
public class FieldProjectionFilter {
public static final String PATTERN_SEPARATOR = ";";
- List<PathGlobPattern> filterPatterns;
+ List<PathGlobPatternStatus> filterPatterns;
+
+ /**
+ * Class for remembering if a glob pattern has matched anything.
+ * If there is an invalid glob pattern that matches nothing, it should throw.
+ */
+ private static class PathGlobPatternStatus {
+ PathGlobPattern pattern;
+ boolean hasMatchingPath = false;
+
+ PathGlobPatternStatus(String pattern) {
+ this.pattern = new PathGlobPattern(pattern);
+ }
+
+ public boolean matches(FieldsPath path) {
+ if (this.pattern.matches(path.toString())) {
+ this.hasMatchingPath = true;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
public FieldProjectionFilter() {
- filterPatterns = new LinkedList<PathGlobPattern>();
+ filterPatterns = new LinkedList<PathGlobPatternStatus>();
}
public FieldProjectionFilter(String filterDescStr) {
- filterPatterns = new LinkedList<PathGlobPattern>();
+ filterPatterns = new LinkedList<PathGlobPatternStatus>();
if (filterDescStr == null || filterDescStr.isEmpty())
return;
String[] rawPatterns = filterDescStr.split(PATTERN_SEPARATOR);
for (String rawPattern : rawPatterns) {
- filterPatterns.add(new PathGlobPattern(rawPattern));
+ filterPatterns.add(new PathGlobPatternStatus(rawPattern));
}
}
@@ -51,17 +73,21 @@ public class FieldProjectionFilter {
return true;
for (int i = 0; i < filterPatterns.size(); i++) {
- if (matchPattern(path, filterPatterns.get(i)))
+
+ if (filterPatterns.get(i).matches(path))
return true;
}
return false;
}
- private boolean matchPattern(FieldsPath path, PathGlobPattern filterPattern) {
- if (filterPattern.matches(path.toString())) {
- return true;
- } else {
- return false;
+ public List<PathGlobPattern> getUnMatchedPatterns() {
+ List<PathGlobPattern> unmatched = new LinkedList<PathGlobPattern>();
+ for (PathGlobPatternStatus p : filterPatterns) {
+ if (!p.hasMatchingPath) {
+ unmatched.add(p.pattern);
+ }
}
+ return unmatched;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/25834942/parquet-thrift/src/main/java/parquet/thrift/projection/PathGlobPattern.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/projection/PathGlobPattern.java b/parquet-thrift/src/main/java/parquet/thrift/projection/PathGlobPattern.java
index 8f1566c..c8ab7d7 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/projection/PathGlobPattern.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/projection/PathGlobPattern.java
@@ -170,6 +170,11 @@ public class PathGlobPattern {
compiled = Pattern.compile(regex.toString());
}
+ @Override
+ public String toString() {
+ return compiled.toString();
+ }
+
/**
* @return true if this is a wildcard pattern (with special chars)
*/
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/25834942/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadWriteAndProjection.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadWriteAndProjection.java b/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadWriteAndProjection.java
index e0585e0..a2ad2c5 100644
--- a/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadWriteAndProjection.java
+++ b/parquet-thrift/src/test/java/parquet/hadoop/thrift/TestParquetToThriftReadWriteAndProjection.java
@@ -18,39 +18,31 @@
*/
package parquet.hadoop.thrift;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayOutputStream;
-import java.util.*;
-
+import com.twitter.data.proto.tutorial.thrift.AddressBook;
+import com.twitter.data.proto.tutorial.thrift.Name;
+import com.twitter.data.proto.tutorial.thrift.Person;
+import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.*;
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;
import org.junit.Test;
-
import parquet.Log;
import parquet.hadoop.api.ReadSupport;
import parquet.hadoop.util.ContextUtil;
-
-import com.twitter.data.proto.tutorial.thrift.AddressBook;
-import com.twitter.data.proto.tutorial.thrift.Name;
-import com.twitter.data.proto.tutorial.thrift.Person;
-import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
import parquet.thrift.test.*;
+import java.io.ByteArrayOutputStream;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
public class TestParquetToThriftReadWriteAndProjection {
private static final Log LOG = Log.getLog(TestParquetToThriftReadWriteAndProjection.class);
@@ -71,7 +63,7 @@ public class TestParquetToThriftReadWriteAndProjection {
" }\n" +
"}";
conf.set(ReadSupport.PARQUET_READ_SCHEMA, readProjectionSchema);
- TBase toWrite=new AddressBook(
+ TBase toWrite = new AddressBook(
Arrays.asList(
new Person(
new Name("Bob", "Roberts"),
@@ -79,20 +71,20 @@ public class TestParquetToThriftReadWriteAndProjection {
"bob.roberts@example.com",
Arrays.asList(new PhoneNumber("1234567890")))));
- TBase toRead=new AddressBook(
+ TBase toRead = new AddressBook(
Arrays.asList(
new Person(
new Name("Bob", "Roberts"),
0,
null,
null)));
- shouldDoProjection(conf,toWrite,toRead,AddressBook.class);
+ shouldDoProjection(conf, toWrite, toRead, AddressBook.class);
}
@Test
public void testPullingInRequiredStructWithFilter() throws Exception {
final String projectionFilterDesc = "persons/{id};persons/email";
- TBase toWrite=new AddressBook(
+ TBase toWrite = new AddressBook(
Arrays.asList(
new Person(
new Name("Bob", "Roberts"),
@@ -100,30 +92,34 @@ public class TestParquetToThriftReadWriteAndProjection {
"bob.roberts@example.com",
Arrays.asList(new PhoneNumber("1234567890")))));
- TBase toRead=new AddressBook(
+ //Name is a required field, but is projected out. To make the thrift record pass validation, the name field is filled
+ //with empty string
+ TBase toRead = new AddressBook(
Arrays.asList(
new Person(
new Name("", ""),
0,
"bob.roberts@example.com",
null)));
- shouldDoProjectionWithThriftColumnFilter(projectionFilterDesc,toWrite,toRead,AddressBook.class);
+ shouldDoProjectionWithThriftColumnFilter(projectionFilterDesc, toWrite, toRead, AddressBook.class);
}
@Test
public void testReorderdOptionalFields() throws Exception {
final String projectionFilter = "**";
- StructWithReorderedOptionalFields toWrite = new StructWithReorderedOptionalFields();
+ StructWithReorderedOptionalFields toWrite = new StructWithReorderedOptionalFields();
toWrite.setFieldOne(1);
toWrite.setFieldTwo(2);
toWrite.setFieldThree(3);
- shouldDoProjectionWithThriftColumnFilter(projectionFilter,toWrite,toWrite,StructWithReorderedOptionalFields.class);
+ shouldDoProjectionWithThriftColumnFilter(projectionFilter, toWrite, toWrite, StructWithReorderedOptionalFields.class);
}
@Test
- public void testNotPullInOptionalFields() throws Exception {
- final String projectionFilterDesc = "nomatch";
- TBase toWrite=new AddressBook(
+ public void testProjectOutOptionalFields() throws Exception {
+
+ final String projectionFilterDesc = "persons/name/*";
+
+ TBase toWrite = new AddressBook(
Arrays.asList(
new Person(
new Name("Bob", "Roberts"),
@@ -131,73 +127,82 @@ public class TestParquetToThriftReadWriteAndProjection {
"bob.roberts@example.com",
Arrays.asList(new PhoneNumber("1234567890")))));
- TBase toRead=new AddressBook();
- shouldDoProjectionWithThriftColumnFilter(projectionFilterDesc, toWrite, toRead,AddressBook.class);
+ //emails and phones are optional fields that do not match the projection filter
+ TBase toRead = new AddressBook(
+ Arrays.asList(
+ new Person(
+ new Name("Bob", "Roberts"),
+ 0,
+ null,
+ null))
+ );
+
+ shouldDoProjectionWithThriftColumnFilter(projectionFilterDesc, toWrite, toRead, AddressBook.class);
}
@Test
- public void testPullInRequiredMaps() throws Exception{
- String filter="name";
+ public void testPullInRequiredMaps() throws Exception {
+ String filter = "name";
- Map<String,String> mapValue=new HashMap<String,String>();
- mapValue.put("a","1");
- mapValue.put("b","2");
- RequiredMapFixture toWrite= new RequiredMapFixture(mapValue);
+ Map<String, String> mapValue = new HashMap<String, String>();
+ mapValue.put("a", "1");
+ mapValue.put("b", "2");
+ RequiredMapFixture toWrite = new RequiredMapFixture(mapValue);
toWrite.setName("testName");
- RequiredMapFixture toRead=new RequiredMapFixture(new HashMap<String,String>());
+ RequiredMapFixture toRead = new RequiredMapFixture(new HashMap<String, String>());
toRead.setName("testName");
- shouldDoProjectionWithThriftColumnFilter(filter,toWrite,toRead,RequiredMapFixture.class);
+ shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredMapFixture.class);
}
@Test
- public void testPullInRequiredLists() throws Exception{
- String filter="info";
+ public void testPullInRequiredLists() throws Exception {
+ String filter = "info";
- RequiredListFixture toWrite=new RequiredListFixture(Arrays.asList(new parquet.thrift.test.Name("first_name")));
+ RequiredListFixture toWrite = new RequiredListFixture(Arrays.asList(new parquet.thrift.test.Name("first_name")));
toWrite.setInfo("test_info");
- RequiredListFixture toRead=new RequiredListFixture(new ArrayList<parquet.thrift.test.Name>());
+ RequiredListFixture toRead = new RequiredListFixture(new ArrayList<parquet.thrift.test.Name>());
toRead.setInfo("test_info");
- shouldDoProjectionWithThriftColumnFilter(filter,toWrite,toRead,RequiredListFixture.class);
+ shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredListFixture.class);
}
@Test
- public void testPullInRequiredSets() throws Exception{
- String filter="info";
+ public void testPullInRequiredSets() throws Exception {
+ String filter = "info";
- RequiredSetFixture toWrite=new RequiredSetFixture(new HashSet<parquet.thrift.test.Name>(Arrays.asList(new parquet.thrift.test.Name("first_name"))));
+ RequiredSetFixture toWrite = new RequiredSetFixture(new HashSet<parquet.thrift.test.Name>(Arrays.asList(new parquet.thrift.test.Name("first_name"))));
toWrite.setInfo("test_info");
- RequiredSetFixture toRead=new RequiredSetFixture(new HashSet<parquet.thrift.test.Name>());
+ RequiredSetFixture toRead = new RequiredSetFixture(new HashSet<parquet.thrift.test.Name>());
toRead.setInfo("test_info");
- shouldDoProjectionWithThriftColumnFilter(filter,toWrite,toRead,RequiredSetFixture.class);
+ shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredSetFixture.class);
}
@Test
- public void testPullInPrimitiveValues() throws Exception{
- String filter="info_string";
+ public void testPullInPrimitiveValues() throws Exception {
+ String filter = "info_string";
- RequiredPrimitiveFixture toWrite= new RequiredPrimitiveFixture(true,(byte)2,(short)3,4,(long)5,(double)6.0,"7");
+ RequiredPrimitiveFixture toWrite = new RequiredPrimitiveFixture(true, (byte)2, (short)3, 4, (long)5, (double)6.0, "7");
toWrite.setInfo_string("it's info");
- RequiredPrimitiveFixture toRead= new RequiredPrimitiveFixture(false,(byte)0,(short)0,0,(long)0,(double)0.0,"");
+ RequiredPrimitiveFixture toRead = new RequiredPrimitiveFixture(false, (byte)0, (short)0, 0, (long)0, (double)0.0, "");
toRead.setInfo_string("it's info");
- shouldDoProjectionWithThriftColumnFilter(filter,toWrite,toRead,RequiredPrimitiveFixture.class);
+ shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredPrimitiveFixture.class);
}
- private void shouldDoProjectionWithThriftColumnFilter(String filterDesc,TBase toWrite, TBase toRead,Class<? extends TBase<?,?>> thriftClass) throws Exception {
+ private void shouldDoProjectionWithThriftColumnFilter(String filterDesc, TBase toWrite, TBase toRead, Class<? extends TBase<?, ?>> thriftClass) throws Exception {
Configuration conf = new Configuration();
conf.set(ThriftReadSupport.THRIFT_COLUMN_FILTER_KEY, filterDesc);
- shouldDoProjection(conf,toWrite,toRead,thriftClass);
+ shouldDoProjection(conf, toWrite, toRead, thriftClass);
}
- private <T extends TBase<?,?>> void shouldDoProjection(Configuration conf,T recordToWrite,T exptectedReadResult, Class<? extends TBase<?,?>> thriftClass) throws Exception {
+ private <T extends TBase<?, ?>> void shouldDoProjection(Configuration conf, T recordToWrite, T exptectedReadResult, Class<? extends TBase<?, ?>> thriftClass) throws Exception {
final Path parquetFile = new Path("target/test/TestParquetToThriftReadWriteAndProjection/file.parquet");
final FileSystem fs = parquetFile.getFileSystem(conf);
if (fs.exists(parquetFile)) {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/25834942/parquet-thrift/src/test/java/parquet/thrift/TestThriftSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/parquet/thrift/TestThriftSchemaConverter.java b/parquet-thrift/src/test/java/parquet/thrift/TestThriftSchemaConverter.java
index 8b8dcef..d498e2e 100644
--- a/parquet-thrift/src/test/java/parquet/thrift/TestThriftSchemaConverter.java
+++ b/parquet-thrift/src/test/java/parquet/thrift/TestThriftSchemaConverter.java
@@ -19,6 +19,7 @@
package parquet.thrift;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static parquet.schema.MessageTypeParser.parseMessageType;
import org.apache.thrift.TBase;
@@ -202,9 +203,29 @@ public class TestThriftSchemaConverter {
"}",TestStructInMap.class);
}
+
+ private void shouldThrowWhenProjectionFilterMatchesNothing(String filters, String unmatchedFilter, Class<? extends TBase<?, ?>> thriftClass) {
+ try {
+ getFilteredSchema(filters, thriftClass);
+ } catch (ThriftProjectionException e) {
+ assertEquals("unmatched projection filters: [" + unmatchedFilter + "]", e.getMessage());
+ return;
+ }
+ fail("should throw projection exception when filter matches nothing");
+ }
+
+ @Test
+ public void testThrowWhenProjectionFilterMatchesNothing() {
+ shouldThrowWhenProjectionFilterMatchesNothing("non_existing", "non_existing", TestStructInMap.class);
+ shouldThrowWhenProjectionFilterMatchesNothing("name;non_existing", "non_existing", TestStructInMap.class);
+ shouldThrowWhenProjectionFilterMatchesNothing("**;non_existing", "non_existing", TestStructInMap.class);
+ shouldThrowWhenProjectionFilterMatchesNothing("**;names/non_existing", "names/non_existing", TestStructInMap.class);
+ shouldThrowWhenProjectionFilterMatchesNothing("**;names/non_existing;non_existing", "names/non_existing, non_existing", TestStructInMap.class);
+ }
+
@Test(expected = ThriftProjectionException.class)
public void testProjectOnlyValueInMap() {
- System.out.println(getFilteredSchema("name;names/value/**", TestStructInMap.class));
+ getFilteredSchema("name;names/value/**", TestStructInMap.class);
}
private void shouldGetProjectedSchema(String filterDesc, String expectedSchemaStr, Class<? extends TBase<?,?>> thriftClass) {
@@ -218,13 +239,11 @@ public class TestThriftSchemaConverter {
return new ThriftSchemaConverter(fieldProjectionFilter).convert(thriftClass);
}
-
@Test
public void testToThriftType() throws Exception {
ThriftSchemaConverter schemaConverter = new ThriftSchemaConverter();
final StructType converted = schemaConverter.toStructType(AddressBook.class);
final String json = converted.toJSON();
- System.out.println(json);
final ThriftType fromJSON = StructType.fromJSON(json);
assertEquals(json, fromJSON.toJSON());
}