You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:00 UTC

[03/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
new file mode 100644
index 0000000..80c4381
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java
@@ -0,0 +1,291 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.example;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.api.DelegatingReadSupport;
+import org.apache.parquet.hadoop.api.DelegatingWriteSupport;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.schema.MessageTypeParser;
+
+public class TestInputOutputFormat {
+  private static final Log LOG = Log.getLog(TestInputOutputFormat.class);
+  final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
+  final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java");
+  final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
+  Job writeJob;
+  Job readJob;
+  private String writeSchema;
+  private String readSchema;
+  private String partialSchema;
+  private Configuration conf;
+
+  private Class<? extends Mapper<?,?,?,?>> readMapperClass;
+  private Class<? extends Mapper<?,?,?,?>> writeMapperClass;
+
+  @Before
+  public void setUp() {
+    conf = new Configuration();
+    writeSchema = "message example {\n" +
+            "required int32 line;\n" +
+            "required binary content;\n" +
+            "}";
+
+    readSchema = "message example {\n" +
+            "required int32 line;\n" +
+            "required binary content;\n" +
+            "}";
+
+    partialSchema = "message example {\n" +
+            "required int32 line;\n" +
+            "}";
+
+    readMapperClass = ReadMapper.class;
+    writeMapperClass = WriteMapper.class;
+  }
+
+
+  public static final class MyWriteSupport extends DelegatingWriteSupport<Group> {
+
+    private long count = 0;
+
+    public MyWriteSupport() {
+      super(new GroupWriteSupport());
+    }
+
+    @Override
+    public void write(Group record) {
+      super.write(record);
+      ++ count;
+    }
+
+    @Override
+    public org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext finalizeWrite() {
+      Map<String, String> extraMetadata = new HashMap<String, String>();
+      extraMetadata.put("my.count", String.valueOf(count));
+      return new FinalizedWriteContext(extraMetadata);
+    }
+  }
+
+  public static final class MyReadSupport extends DelegatingReadSupport<Group> {
+    public MyReadSupport() {
+      super(new GroupReadSupport());
+    }
+
+    @Override
+    public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {
+      Set<String> counts = context.getKeyValueMetadata().get("my.count");
+      assertTrue("counts: " + counts, counts.size() > 0);
+      return super.init(context);
+    }
+  }
+
+  public static class ReadMapper extends Mapper<LongWritable, Text, Void, Group> {
+    private SimpleGroupFactory factory;
+
+    protected void setup(org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Void, Group>.Context context) throws java.io.IOException, InterruptedException {
+      factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(ContextUtil.getConfiguration(context)));
+    }
+
+    ;
+
+    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Void, Group>.Context context) throws java.io.IOException, InterruptedException {
+      Group group = factory.newGroup()
+              .append("line", (int) key.get())
+              .append("content", value.toString());
+      context.write(null, group);
+    }
+  }
+
+  public static class WriteMapper extends Mapper<Void, Group, LongWritable, Text> {
+    protected void map(Void key, Group value, Mapper<Void, Group, LongWritable, Text>.Context context) throws IOException, InterruptedException {
+      context.write(new LongWritable(value.getInteger("line", 0)), new Text(value.getString("content", 0)));
+    }
+  }
+  public static class PartialWriteMapper extends Mapper<Void, Group, LongWritable, Text> {
+    protected void map(Void key, Group value, Mapper<Void, Group, LongWritable, Text>.Context context) throws IOException, InterruptedException {
+      context.write(new LongWritable(value.getInteger("line", 0)), new Text("dummy"));
+    }
+  }
+  private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
+    runMapReduceJob(codec, Collections.<String, String>emptyMap());
+  }
+  private void runMapReduceJob(CompressionCodecName codec, Map<String, String> extraConf) throws IOException, ClassNotFoundException, InterruptedException {
+    Configuration conf = new Configuration(this.conf);
+    for (Map.Entry<String, String> entry : extraConf.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
+    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
+    fileSystem.delete(parquetPath, true);
+    fileSystem.delete(outputPath, true);
+    {
+      writeJob = new Job(conf, "write");
+      TextInputFormat.addInputPath(writeJob, inputPath);
+      writeJob.setInputFormatClass(TextInputFormat.class);
+      writeJob.setNumReduceTasks(0);
+      ParquetOutputFormat.setCompression(writeJob, codec);
+      ParquetOutputFormat.setOutputPath(writeJob, parquetPath);
+      writeJob.setOutputFormatClass(ParquetOutputFormat.class);
+      writeJob.setMapperClass(readMapperClass);
+
+      ParquetOutputFormat.setWriteSupportClass(writeJob, MyWriteSupport.class);
+      GroupWriteSupport.setSchema(
+              MessageTypeParser.parseMessageType(writeSchema),
+              writeJob.getConfiguration());
+      writeJob.submit();
+      waitForJob(writeJob);
+    }
+    {
+      conf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);
+      readJob = new Job(conf, "read");
+
+      readJob.setInputFormatClass(ParquetInputFormat.class);
+      ParquetInputFormat.setReadSupportClass(readJob, MyReadSupport.class);
+
+      ParquetInputFormat.setInputPaths(readJob, parquetPath);
+      readJob.setOutputFormatClass(TextOutputFormat.class);
+      TextOutputFormat.setOutputPath(readJob, outputPath);
+      readJob.setMapperClass(writeMapperClass);
+      readJob.setNumReduceTasks(0);
+      readJob.submit();
+      waitForJob(readJob);
+    }
+  }
+
+  private void testReadWrite(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
+    testReadWrite(codec, Collections.<String, String>emptyMap());
+  }
+  private void testReadWrite(CompressionCodecName codec, Map<String, String> conf) throws IOException, ClassNotFoundException, InterruptedException {
+    runMapReduceJob(codec, conf);
+    final BufferedReader in = new BufferedReader(new FileReader(new File(inputPath.toString())));
+    final BufferedReader out = new BufferedReader(new FileReader(new File(outputPath.toString(), "part-m-00000")));
+    String lineIn;
+    String lineOut = null;
+    int lineNumber = 0;
+    while ((lineIn = in.readLine()) != null && (lineOut = out.readLine()) != null) {
+      ++lineNumber;
+      lineOut = lineOut.substring(lineOut.indexOf("\t") + 1);
+      assertEquals("line " + lineNumber, lineIn, lineOut);
+    }
+    assertNull("line " + lineNumber, out.readLine());
+    assertNull("line " + lineNumber, lineIn);
+    in.close();
+    out.close();
+  }
+
+  @Test
+  public void testReadWrite() throws IOException, ClassNotFoundException, InterruptedException {
+    // TODO: Lzo requires additional external setup steps so leave it out for now
+    testReadWrite(CompressionCodecName.GZIP);
+    testReadWrite(CompressionCodecName.UNCOMPRESSED);
+    testReadWrite(CompressionCodecName.SNAPPY);
+  }
+
+  @Test
+  public void testReadWriteTaskSideMD() throws IOException, ClassNotFoundException, InterruptedException {
+    testReadWrite(CompressionCodecName.UNCOMPRESSED, new HashMap<String, String>() {{ put("parquet.task.side.metadata", "true"); }});
+  }
+
+  @Test
+  public void testProjection() throws Exception{
+    readSchema=partialSchema;
+    writeMapperClass = PartialWriteMapper.class;
+    runMapReduceJob(CompressionCodecName.GZIP);
+  }
+
+  private static long value(Job job, String groupName, String name) throws Exception {
+    // getGroup moved to AbstractCounters
+    Method getGroup = org.apache.hadoop.mapreduce.Counters.class.getMethod("getGroup", String.class);
+    // CounterGroup changed to an interface
+    Method findCounter = org.apache.hadoop.mapreduce.CounterGroup.class.getMethod("findCounter", String.class);
+    // Counter changed to an interface
+    Method getValue = org.apache.hadoop.mapreduce.Counter.class.getMethod("getValue");
+    CounterGroup group = (CounterGroup) getGroup.invoke(job.getCounters(), groupName);
+    Counter counter = (Counter) findCounter.invoke(group, name);
+    return (Long) getValue.invoke(counter);
+  }
+
+  @Test
+  public void testReadWriteWithCounter() throws Exception {
+    runMapReduceJob(CompressionCodecName.GZIP);
+    assertTrue(value(readJob, "parquet", "bytesread") > 0L);
+    assertTrue(value(readJob, "parquet", "bytestotal") > 0L);
+    assertTrue(value(readJob, "parquet", "bytesread")
+            == value(readJob, "parquet", "bytestotal"));
+    //not testing the time read counter since it could be zero due to the size of data is too small
+  }
+
+  @Test
+  public void testReadWriteWithoutCounter() throws Exception {
+    conf.set("parquet.benchmark.time.read", "false");
+    conf.set("parquet.benchmark.bytes.total", "false");
+    conf.set("parquet.benchmark.bytes.read", "false");
+    runMapReduceJob(CompressionCodecName.GZIP);
+    assertTrue(value(readJob, "parquet", "bytesread") == 0L);
+    assertTrue(value(readJob, "parquet", "bytestotal") == 0L);
+    assertTrue(value(readJob, "parquet", "timeread") == 0L);
+  }
+
+  private void waitForJob(Job job) throws InterruptedException, IOException {
+    while (!job.isComplete()) {
+      LOG.debug("waiting for job " + job.getJobName());
+      sleep(100);
+    }
+    LOG.info("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
+    if (!job.isSuccessful()) {
+      throw new RuntimeException("job failed " + job.getJobName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java
new file mode 100644
index 0000000..7b27e19
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/metadata/TestColumnChunkMetaData.java
@@ -0,0 +1,83 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.BinaryStatistics;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
+public class TestColumnChunkMetaData {
+
+
+  @Test
+  public void testConversionBig() {
+    long big = (long)Integer.MAX_VALUE + 1;
+
+    ColumnChunkMetaData md = newMD(big);
+    assertTrue(md instanceof IntColumnChunkMetaData);
+    assertEquals(big, md.getFirstDataPageOffset());
+  }
+
+  @Test
+  public void testConversionSmall() {
+    long small = 1;
+
+    ColumnChunkMetaData md = newMD(small);
+    assertTrue(md instanceof IntColumnChunkMetaData);
+    assertEquals(small, md.getFirstDataPageOffset());
+  }
+
+  @Test
+  public void testConversionVeryBig() {
+    long veryBig = (long)Integer.MAX_VALUE * 3;
+
+    ColumnChunkMetaData md = newMD(veryBig);
+    assertTrue(md instanceof LongColumnChunkMetaData);
+    assertEquals(veryBig, md.getFirstDataPageOffset());
+  }
+
+  @Test
+  public void testConversionNeg() {
+    long neg = -1;
+
+    ColumnChunkMetaData md = newMD(neg);
+    assertTrue(md instanceof LongColumnChunkMetaData);
+    assertEquals(neg, md.getFirstDataPageOffset());
+  }
+
+  private ColumnChunkMetaData newMD(long big) {
+    Set<Encoding> e = new HashSet<Encoding>();
+    PrimitiveTypeName t = BINARY;
+    ColumnPath p = ColumnPath.get("foo");
+    CompressionCodecName c = CompressionCodecName.GZIP;
+    BinaryStatistics s = new BinaryStatistics();
+    ColumnChunkMetaData md = ColumnChunkMetaData.get(p, t, c, e, s,
+                                                     big, 0, 0, 0, 0);
+    return md;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestSerializationUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestSerializationUtil.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestSerializationUtil.java
new file mode 100644
index 0000000..ec83eae
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestSerializationUtil.java
@@ -0,0 +1,71 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Serialization utils copied from:
+ * https://github.com/kevinweil/elephant-bird/blob/master/core/src/test/java/com/twitter/elephantbird/util/TestHadoopUtils.java
+ *
+ * TODO: Refactor elephant-bird so that we can depend on utils like this without extra baggage.
+ */
+public class TestSerializationUtil {
+
+  @Test
+  public void testReadWriteObjectToConfAsBase64() throws Exception {
+    Map<Integer, String> anObject = new HashMap<Integer, String>();
+    anObject.put(7, "seven");
+    anObject.put(8, "eight");
+
+    Configuration conf = new Configuration();
+
+    SerializationUtil.writeObjectToConfAsBase64("anobject", anObject, conf);
+    Map<Integer, String> copy = SerializationUtil.readObjectFromConfAsBase64("anobject", conf);
+    assertEquals(anObject, copy);
+
+    try {
+      Set<String> bad = SerializationUtil.readObjectFromConfAsBase64("anobject", conf);
+      fail("This should throw a ClassCastException");
+    } catch (ClassCastException e) {
+
+    }
+
+    conf = new Configuration();
+    Object nullObj = null;
+
+    SerializationUtil.writeObjectToConfAsBase64("anobject", null, conf);
+    Object copyObj = SerializationUtil.readObjectFromConfAsBase64("anobject", conf);
+    assertEquals(nullObj, copyObj);
+  }
+
+  @Test
+  public void readObjectFromConfAsBase64UnsetKey() throws Exception {
+    assertNull(SerializationUtil.readObjectFromConfAsBase64("non-existant-key", new Configuration()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/filter2/compat/TestRowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/filter2/compat/TestRowGroupFilter.java b/parquet-hadoop/src/test/java/parquet/filter2/compat/TestRowGroupFilter.java
deleted file mode 100644
index a4014b4..0000000
--- a/parquet-hadoop/src/test/java/parquet/filter2/compat/TestRowGroupFilter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.compat;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.junit.Test;
-
-import parquet.column.statistics.IntStatistics;
-import parquet.filter2.predicate.Operators.IntColumn;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-import static org.junit.Assert.assertEquals;
-import static parquet.filter2.predicate.FilterApi.eq;
-import static parquet.filter2.predicate.FilterApi.intColumn;
-import static parquet.filter2.predicate.FilterApi.notEq;
-import static parquet.hadoop.TestInputFormat.makeBlockFromStats;
-
-public class TestRowGroupFilter {
-  @Test
-  public void testApplyRowGroupFilters() {
-
-    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-
-    IntStatistics stats1 = new IntStatistics();
-    stats1.setMinMax(10, 100);
-    stats1.setNumNulls(4);
-    BlockMetaData b1 = makeBlockFromStats(stats1, 301);
-    blocks.add(b1);
-
-    IntStatistics stats2 = new IntStatistics();
-    stats2.setMinMax(8, 102);
-    stats2.setNumNulls(0);
-    BlockMetaData b2 = makeBlockFromStats(stats2, 302);
-    blocks.add(b2);
-
-    IntStatistics stats3 = new IntStatistics();
-    stats3.setMinMax(100, 102);
-    stats3.setNumNulls(12);
-    BlockMetaData b3 = makeBlockFromStats(stats3, 303);
-    blocks.add(b3);
-
-
-    IntStatistics stats4 = new IntStatistics();
-    stats4.setMinMax(0, 0);
-    stats4.setNumNulls(304);
-    BlockMetaData b4 = makeBlockFromStats(stats4, 304);
-    blocks.add(b4);
-
-
-    IntStatistics stats5 = new IntStatistics();
-    stats5.setMinMax(50, 50);
-    stats5.setNumNulls(7);
-    BlockMetaData b5 = makeBlockFromStats(stats5, 305);
-    blocks.add(b5);
-
-    IntStatistics stats6 = new IntStatistics();
-    stats6.setMinMax(0, 0);
-    stats6.setNumNulls(12);
-    BlockMetaData b6 = makeBlockFromStats(stats6, 306);
-    blocks.add(b6);
-
-    MessageType schema = MessageTypeParser.parseMessageType("message Document { optional int32 foo; }");
-    IntColumn foo = intColumn("foo");
-
-    List<BlockMetaData> filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)), blocks, schema);
-    assertEquals(Arrays.asList(b1, b2, b5), filtered);
-
-    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, 50)), blocks, schema);
-    assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
-
-    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, null)), blocks, schema);
-    assertEquals(Arrays.asList(b1, b3, b4, b5, b6), filtered);
-
-    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, null)), blocks, schema);
-    assertEquals(Arrays.asList(b1, b2, b3, b5, b6), filtered);
-
-    filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 0)), blocks, schema);
-    assertEquals(Arrays.asList(b6), filtered);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/PhoneBookWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/PhoneBookWriter.java
deleted file mode 100644
index 917fb40..0000000
--- a/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/PhoneBookWriter.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import parquet.example.data.Group;
-import parquet.example.data.simple.SimpleGroup;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.hadoop.ParquetReader;
-import parquet.hadoop.ParquetWriter;
-import parquet.hadoop.example.GroupReadSupport;
-import parquet.hadoop.example.GroupWriteSupport;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-public class PhoneBookWriter {
-  private static final String schemaString =
-      "message user {\n"
-          + "  required int64 id;\n"
-          + "  optional binary name (UTF8);\n"
-          + "  optional group location {\n"
-          + "    optional double lon;\n"
-          + "    optional double lat;\n"
-          + "  }\n"
-          + "  optional group phoneNumbers {\n"
-          + "    repeated group phone {\n"
-          + "      required int64 number;\n"
-          + "      optional binary kind (UTF8);\n"
-          + "    }\n"
-          + "  }\n"
-          + "}\n";
-
-  private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString);
-
-  public static class Location {
-    private final Double lon;
-    private final Double lat;
-
-    public Location(Double lon, Double lat) {
-      this.lon = lon;
-      this.lat = lat;
-    }
-
-    public Double getLon() {
-      return lon;
-    }
-
-    public Double getLat() {
-      return lat;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      Location location = (Location) o;
-
-      if (lat != null ? !lat.equals(location.lat) : location.lat != null) return false;
-      if (lon != null ? !lon.equals(location.lon) : location.lon != null) return false;
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = lon != null ? lon.hashCode() : 0;
-      result = 31 * result + (lat != null ? lat.hashCode() : 0);
-      return result;
-    }
-  }
-
-  public static class PhoneNumber {
-    private final long number;
-    private final String kind;
-
-    public PhoneNumber(long number, String kind) {
-      this.number = number;
-      this.kind = kind;
-    }
-
-    public long getNumber() {
-      return number;
-    }
-
-    public String getKind() {
-      return kind;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      PhoneNumber that = (PhoneNumber) o;
-
-      if (number != that.number) return false;
-      if (kind != null ? !kind.equals(that.kind) : that.kind != null) return false;
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = (int) (number ^ (number >>> 32));
-      result = 31 * result + (kind != null ? kind.hashCode() : 0);
-      return result;
-    }
-  }
-
-  public static class User {
-    private final long id;
-    private final String name;
-    private final List<PhoneNumber> phoneNumbers;
-    private final Location location;
-
-    public User(long id, String name, List<PhoneNumber> phoneNumbers, Location location) {
-      this.id = id;
-      this.name = name;
-      this.phoneNumbers = phoneNumbers;
-      this.location = location;
-    }
-
-    public long getId() {
-      return id;
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public List<PhoneNumber> getPhoneNumbers() {
-      return phoneNumbers;
-    }
-
-    public Location getLocation() {
-      return location;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      User user = (User) o;
-
-      if (id != user.id) return false;
-      if (location != null ? !location.equals(user.location) : user.location != null) return false;
-      if (name != null ? !name.equals(user.name) : user.name != null) return false;
-      if (phoneNumbers != null ? !phoneNumbers.equals(user.phoneNumbers) : user.phoneNumbers != null) return false;
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = (int) (id ^ (id >>> 32));
-      result = 31 * result + (name != null ? name.hashCode() : 0);
-      result = 31 * result + (phoneNumbers != null ? phoneNumbers.hashCode() : 0);
-      result = 31 * result + (location != null ? location.hashCode() : 0);
-      return result;
-    }
-  }
-
-  public static SimpleGroup groupFromUser(User user) {
-    SimpleGroup root = new SimpleGroup(schema);
-    root.append("id", user.getId());
-
-    if (user.getName() != null) {
-      root.append("name", user.getName());
-    }
-
-    if (user.getPhoneNumbers() != null) {
-      Group phoneNumbers = root.addGroup("phoneNumbers");
-      for (PhoneNumber number : user.getPhoneNumbers()) {
-        Group phone = phoneNumbers.addGroup("phone");
-        phone.append("number", number.getNumber());
-        if (number.getKind() != null) {
-          phone.append("kind", number.getKind());
-        }
-      }
-    }
-
-    if (user.getLocation() != null) {
-      Group location = root.addGroup("location");
-      if (user.getLocation().getLon() != null) {
-        location.append("lon", user.getLocation().getLon());
-      }
-      if (user.getLocation().getLat() != null) {
-        location.append("lat", user.getLocation().getLat());
-      }
-    }
-    return root;
-  }
-
-  public static File writeToFile(List<User> users) throws IOException {
-    File f = File.createTempFile("phonebook", ".parquet");
-    f.deleteOnExit();
-    if (!f.delete()) {
-      throw new IOException("couldn't delete tmp file" + f);
-    }
-
-    writeToFile(f, users);
-
-    return f;
-  }
-
-  public static void writeToFile(File f, List<User> users) throws IOException {
-    Configuration conf = new Configuration();
-    GroupWriteSupport.setSchema(schema, conf);
-
-    ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(f.getAbsolutePath()), conf, new GroupWriteSupport());
-    for (User u : users) {
-      writer.write(groupFromUser(u));
-    }
-    writer.close();
-  }
-
-  public static List<Group> readFile(File f, Filter filter) throws IOException {
-    Configuration conf = new Configuration();
-    GroupWriteSupport.setSchema(schema, conf);
-
-    ParquetReader<Group> reader =
-        ParquetReader.builder(new GroupReadSupport(), new Path(f.getAbsolutePath()))
-                     .withConf(conf)
-                     .withFilter(filter)
-                     .build();
-
-    Group current;
-    List<Group> users = new ArrayList<Group>();
-
-    current = reader.read();
-    while (current != null) {
-      users.add(current);
-      current = reader.read();
-    }
-
-    return users;
-  }
-
-  public static void main(String[] args) throws IOException {
-    File f = new File(args[0]);
-    writeToFile(f, TestRecordLevelFilters.makeUsers());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java
deleted file mode 100644
index c112bd9..0000000
--- a/parquet-hadoop/src/test/java/parquet/filter2/recordlevel/TestRecordLevelFilters.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.recordlevel;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.HashSet;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import parquet.example.data.Group;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.filter2.predicate.Operators.BinaryColumn;
-import parquet.filter2.predicate.Operators.DoubleColumn;
-import parquet.filter2.predicate.Operators.LongColumn;
-import parquet.filter2.predicate.Statistics;
-import parquet.filter2.predicate.UserDefinedPredicate;
-import parquet.filter2.recordlevel.PhoneBookWriter.Location;
-import parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
-import parquet.filter2.recordlevel.PhoneBookWriter.User;
-import parquet.io.api.Binary;
-
-import static org.junit.Assert.assertEquals;
-import static parquet.filter2.predicate.FilterApi.and;
-import static parquet.filter2.predicate.FilterApi.binaryColumn;
-import static parquet.filter2.predicate.FilterApi.doubleColumn;
-import static parquet.filter2.predicate.FilterApi.longColumn;
-import static parquet.filter2.predicate.FilterApi.eq;
-import static parquet.filter2.predicate.FilterApi.gt;
-import static parquet.filter2.predicate.FilterApi.not;
-import static parquet.filter2.predicate.FilterApi.notEq;
-import static parquet.filter2.predicate.FilterApi.or;
-import static parquet.filter2.predicate.FilterApi.userDefined;
-
-public class TestRecordLevelFilters {
-
-  public static List<User> makeUsers() {
-    List<User> users = new ArrayList<User>();
-
-    users.add(new User(17, null, null, null));
-
-    users.add(new User(18, "bob", null, null));
-
-    users.add(new User(19, "alice", new ArrayList<PhoneNumber>(), null));
-
-    users.add(new User(20, "thing1", Arrays.asList(new PhoneNumber(5555555555L, null)), null));
-
-    users.add(new User(27, "thing2", Arrays.asList(new PhoneNumber(1111111111L, "home")), null));
-
-    users.add(new User(28, "popular", Arrays.asList(
-        new PhoneNumber(1111111111L, "home"),
-        new PhoneNumber(2222222222L, null),
-        new PhoneNumber(3333333333L, "mobile")
-    ), null));
-
-    users.add(new User(30, null, Arrays.asList(new PhoneNumber(1111111111L, "home")), null));
-
-    for (int i = 100; i < 200; i++) {
-      Location location = null;
-      if (i % 3 == 1) {
-        location = new Location((double)i, (double)i*2);
-      }
-      if (i % 3 == 2) {
-        location = new Location((double)i, null);
-      }
-      users.add(new User(i, "p" + i, Arrays.asList(new PhoneNumber(i, "cell")), location));
-    }
-
-    return users;
-  }
-
-  private static File phonebookFile;
-  private static List<User> users;
-
-  @BeforeClass
-  public static void setup() throws IOException{
-    users = makeUsers();
-    phonebookFile = PhoneBookWriter.writeToFile(users);
-  }
-
-  private static interface UserFilter {
-    boolean keep(User u);
-  }
-
-  private static List<Group> getExpected(UserFilter f) {
-    List<Group> expected = new ArrayList<Group>();
-    for (User u : users) {
-      if (f.keep(u)) {
-        expected.add(PhoneBookWriter.groupFromUser(u));
-      }
-    }
-    return expected;
-  }
-
-  private static void assertFilter(List<Group> found, UserFilter f) {
-    List<Group> expected = getExpected(f);
-    assertEquals(expected.size(), found.size());
-    Iterator<Group> expectedIter = expected.iterator();
-    Iterator<Group> foundIter = found.iterator();
-    while(expectedIter.hasNext()) {
-      assertEquals(expectedIter.next().toString(), foundIter.next().toString());
-    }
-  }
-
-  @Test
-  public void testNoFilter() throws Exception {
-    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.NOOP);
-    assertFilter(found, new UserFilter() {
-      @Override
-      public boolean keep(User u) {
-        return true;
-      }
-    });
-  }
-
-  @Test
-  public void testAllFilter() throws Exception {
-    BinaryColumn name = binaryColumn("name");
-
-    FilterPredicate pred = eq(name, Binary.fromString("no matches"));
-
-    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
-    assertEquals(new ArrayList<Group>(), found);
-  }
-
-  @Test
-  public void testNameNotNull() throws Exception {
-    BinaryColumn name = binaryColumn("name");
-
-    FilterPredicate pred = notEq(name, null);
-
-    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
-
-    assertFilter(found, new UserFilter() {
-      @Override
-      public boolean keep(User u) {
-        return u.getName() != null;
-      }
-    });
-  }
-
-  public static class StartWithP extends UserDefinedPredicate<Binary> {
-
-    @Override
-    public boolean keep(Binary value) {
-      if (value == null) {
-        return false;
-      }
-      return value.toStringUsingUTF8().startsWith("p");
-    }
-
-    @Override
-    public boolean canDrop(Statistics<Binary> statistics) {
-      return false;
-    }
-
-    @Override
-    public boolean inverseCanDrop(Statistics<Binary> statistics) {
-      return false;
-    }
-  }
-  
-  public static class SetInFilter extends UserDefinedPredicate<Long> implements Serializable {
-
-    private HashSet<Long> hSet;
-
-    public SetInFilter(HashSet<Long> phSet) {
-      hSet = phSet;
-    }
-
-    @Override
-    public boolean keep(Long value) {
-      if (value == null) {
-        return false;
-      }
-
-      return hSet.contains(value);
-    }
-
-    @Override
-    public boolean canDrop(Statistics<Long> statistics) {
-      return false;
-    }
-
-    @Override
-    public boolean inverseCanDrop(Statistics<Long> statistics) {
-      return false;
-    }
-  }
-
-  @Test
-  public void testNameNotStartWithP() throws Exception {
-    BinaryColumn name = binaryColumn("name");
-
-    FilterPredicate pred = not(userDefined(name, StartWithP.class));
-
-    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
-
-    assertFilter(found, new UserFilter() {
-      @Override
-      public boolean keep(User u) {
-        return u.getName() == null || !u.getName().startsWith("p");
-      }
-    });
-  }
-  
-  @Test
-  public void testUserDefinedByInstance() throws Exception {
-    LongColumn name = longColumn("id");
-
-    final HashSet<Long> h = new HashSet<Long>();
-    h.add(20L); 
-    h.add(27L);
-    h.add(28L);
-    
-    FilterPredicate pred = userDefined(name, new SetInFilter(h));
-
-    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
-
-    assertFilter(found, new UserFilter() {
-      @Override
-      public boolean keep(User u) {
-        return u != null && h.contains(u.getId());
-      }
-    });
-  }
-
-  @Test
-  public void testComplex() throws Exception {
-    BinaryColumn name = binaryColumn("name");
-    DoubleColumn lon = doubleColumn("location.lon");
-    DoubleColumn lat = doubleColumn("location.lat");
-
-    FilterPredicate pred = or(and(gt(lon, 150.0), notEq(lat, null)), eq(name, Binary.fromString("alice")));
-
-    List<Group> found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred));
-
-    assertFilter(found, new UserFilter() {
-      @Override
-      public boolean keep(User u) {
-        String name = u.getName();
-        Double lat = null;
-        Double lon = null;
-        if (u.getLocation() != null) {
-          lat = u.getLocation().getLat();
-          lon = u.getLocation().getLon();
-        }
-
-        return (lon != null && lon > 150.0 && lat != null) || "alice".equals(name);
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java
deleted file mode 100644
index f4e7e91..0000000
--- a/parquet-hadoop/src/test/java/parquet/filter2/statisticslevel/TestStatisticsFilter.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.statisticslevel;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-
-import org.junit.Test;
-
-import parquet.column.Encoding;
-import parquet.column.statistics.DoubleStatistics;
-import parquet.column.statistics.IntStatistics;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.filter2.predicate.LogicalInverseRewriter;
-import parquet.filter2.predicate.Operators.DoubleColumn;
-import parquet.filter2.predicate.Operators.IntColumn;
-import parquet.filter2.predicate.Statistics;
-import parquet.filter2.predicate.UserDefinedPredicate;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static parquet.filter2.predicate.FilterApi.and;
-import static parquet.filter2.predicate.FilterApi.doubleColumn;
-import static parquet.filter2.predicate.FilterApi.eq;
-import static parquet.filter2.predicate.FilterApi.gt;
-import static parquet.filter2.predicate.FilterApi.gtEq;
-import static parquet.filter2.predicate.FilterApi.intColumn;
-import static parquet.filter2.predicate.FilterApi.lt;
-import static parquet.filter2.predicate.FilterApi.ltEq;
-import static parquet.filter2.predicate.FilterApi.not;
-import static parquet.filter2.predicate.FilterApi.notEq;
-import static parquet.filter2.predicate.FilterApi.or;
-import static parquet.filter2.predicate.FilterApi.userDefined;
-import static parquet.filter2.statisticslevel.StatisticsFilter.canDrop;
-
-public class TestStatisticsFilter {
-
-  private static ColumnChunkMetaData getIntColumnMeta(IntStatistics stats, long valueCount) {
-    return ColumnChunkMetaData.get(ColumnPath.get("int", "column"),
-        PrimitiveTypeName.INT32,
-        CompressionCodecName.GZIP,
-        new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
-        stats,
-        0L, 0L, valueCount, 0L, 0L);
-  }
-
-  private static ColumnChunkMetaData getDoubleColumnMeta(DoubleStatistics stats, long valueCount) {
-    return ColumnChunkMetaData.get(ColumnPath.get("double", "column"),
-        PrimitiveTypeName.DOUBLE,
-        CompressionCodecName.GZIP,
-        new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
-        stats,
-        0L, 0L, valueCount, 0L, 0L);
-  }
-
-  private static final IntColumn intColumn = intColumn("int.column");
-  private static final DoubleColumn doubleColumn = doubleColumn("double.column");
-
-  private static final IntStatistics intStats = new IntStatistics();
-  private static final IntStatistics nullIntStats = new IntStatistics();
-  private static final DoubleStatistics doubleStats = new DoubleStatistics();
-
-  static {
-    intStats.setMinMax(10, 100);
-    doubleStats.setMinMax(10, 100);
-
-    nullIntStats.setMinMax(0, 0);
-    nullIntStats.setNumNulls(177);
-  }
-
-  private static final List<ColumnChunkMetaData> columnMetas = Arrays.asList(
-      getIntColumnMeta(intStats, 177L),
-      getDoubleColumnMeta(doubleStats, 177L));
-
-  private static final List<ColumnChunkMetaData> nullColumnMetas = Arrays.asList(
-      getIntColumnMeta(nullIntStats, 177L), // column of all nulls
-      getDoubleColumnMeta(doubleStats, 177L));
-
-
-  @Test
-  public void testEqNonNull() {
-    assertTrue(canDrop(eq(intColumn, 9), columnMetas));
-    assertFalse(canDrop(eq(intColumn, 10), columnMetas));
-    assertFalse(canDrop(eq(intColumn, 100), columnMetas));
-    assertTrue(canDrop(eq(intColumn, 101), columnMetas));
-
-    // drop columns of all nulls when looking for non-null value
-    assertTrue(canDrop(eq(intColumn, 0), nullColumnMetas));
-  }
-
-  @Test
-  public void testEqNull() {
-    IntStatistics statsNoNulls = new IntStatistics();
-    statsNoNulls.setMinMax(10, 100);
-    statsNoNulls.setNumNulls(0);
-
-    IntStatistics statsSomeNulls = new IntStatistics();
-    statsSomeNulls.setMinMax(10, 100);
-    statsSomeNulls.setNumNulls(3);
-
-    assertTrue(canDrop(eq(intColumn, null), Arrays.asList(
-        getIntColumnMeta(statsNoNulls, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-    assertFalse(canDrop(eq(intColumn, null), Arrays.asList(
-        getIntColumnMeta(statsSomeNulls, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-  }
-
-  @Test
-  public void testNotEqNonNull() {
-    assertFalse(canDrop(notEq(intColumn, 9), columnMetas));
-    assertFalse(canDrop(notEq(intColumn, 10), columnMetas));
-    assertFalse(canDrop(notEq(intColumn, 100), columnMetas));
-    assertFalse(canDrop(notEq(intColumn, 101), columnMetas));
-
-    IntStatistics allSevens = new IntStatistics();
-    allSevens.setMinMax(7, 7);
-    assertTrue(canDrop(notEq(intColumn, 7), Arrays.asList(
-        getIntColumnMeta(allSevens, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-  }
-
-  @Test
-  public void testNotEqNull() {
-    IntStatistics statsNoNulls = new IntStatistics();
-    statsNoNulls.setMinMax(10, 100);
-    statsNoNulls.setNumNulls(0);
-
-    IntStatistics statsSomeNulls = new IntStatistics();
-    statsSomeNulls.setMinMax(10, 100);
-    statsSomeNulls.setNumNulls(3);
-
-    IntStatistics statsAllNulls = new IntStatistics();
-    statsAllNulls.setMinMax(0, 0);
-    statsAllNulls.setNumNulls(177);
-
-    assertFalse(canDrop(notEq(intColumn, null), Arrays.asList(
-        getIntColumnMeta(statsNoNulls, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-    assertFalse(canDrop(notEq(intColumn, null), Arrays.asList(
-        getIntColumnMeta(statsSomeNulls, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-    assertTrue(canDrop(notEq(intColumn, null), Arrays.asList(
-        getIntColumnMeta(statsAllNulls, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-  }
-
-  @Test
-  public void testLt() {
-    assertTrue(canDrop(lt(intColumn, 9), columnMetas));
-    assertTrue(canDrop(lt(intColumn, 10), columnMetas));
-    assertFalse(canDrop(lt(intColumn, 100), columnMetas));
-    assertFalse(canDrop(lt(intColumn, 101), columnMetas));
-
-    assertTrue(canDrop(lt(intColumn, 0), nullColumnMetas));
-    assertTrue(canDrop(lt(intColumn, 7), nullColumnMetas));
-  }
-
-  @Test
-  public void testLtEq() {
-    assertTrue(canDrop(ltEq(intColumn, 9), columnMetas));
-    assertFalse(canDrop(ltEq(intColumn, 10), columnMetas));
-    assertFalse(canDrop(ltEq(intColumn, 100), columnMetas));
-    assertFalse(canDrop(ltEq(intColumn, 101), columnMetas));
-
-    assertTrue(canDrop(ltEq(intColumn, 0), nullColumnMetas));
-    assertTrue(canDrop(ltEq(intColumn, 7), nullColumnMetas));
-  }
-
-  @Test
-  public void testGt() {
-    assertFalse(canDrop(gt(intColumn, 9), columnMetas));
-    assertFalse(canDrop(gt(intColumn, 10), columnMetas));
-    assertTrue(canDrop(gt(intColumn, 100), columnMetas));
-    assertTrue(canDrop(gt(intColumn, 101), columnMetas));
-
-    assertTrue(canDrop(gt(intColumn, 0), nullColumnMetas));
-    assertTrue(canDrop(gt(intColumn, 7), nullColumnMetas));
-  }
-
-  @Test
-  public void testGtEq() {
-    assertFalse(canDrop(gtEq(intColumn, 9), columnMetas));
-    assertFalse(canDrop(gtEq(intColumn, 10), columnMetas));
-    assertFalse(canDrop(gtEq(intColumn, 100), columnMetas));
-    assertTrue(canDrop(gtEq(intColumn, 101), columnMetas));
-
-    assertTrue(canDrop(gtEq(intColumn, 0), nullColumnMetas));
-    assertTrue(canDrop(gtEq(intColumn, 7), nullColumnMetas));
-  }
-
-  @Test
-  public void testAnd() {
-    FilterPredicate yes = eq(intColumn, 9);
-    FilterPredicate no = eq(doubleColumn, 50D);
-    assertTrue(canDrop(and(yes, yes), columnMetas));
-    assertTrue(canDrop(and(yes, no), columnMetas));
-    assertTrue(canDrop(and(no, yes), columnMetas));
-    assertFalse(canDrop(and(no, no), columnMetas));
-  }
-
-  @Test
-  public void testOr() {
-    FilterPredicate yes = eq(intColumn, 9);
-    FilterPredicate no = eq(doubleColumn, 50D);
-    assertTrue(canDrop(or(yes, yes), columnMetas));
-    assertFalse(canDrop(or(yes, no), columnMetas));
-    assertFalse(canDrop(or(no, yes), columnMetas));
-    assertFalse(canDrop(or(no, no), columnMetas));
-  }
-
-  public static class SevensAndEightsUdp extends UserDefinedPredicate<Integer> {
-
-    @Override
-    public boolean keep(Integer value) {
-      throw new RuntimeException("this method should not be called");
-    }
-
-    @Override
-    public boolean canDrop(Statistics<Integer> statistics) {
-      return statistics.getMin() == 7 && statistics.getMax() == 7;
-    }
-
-    @Override
-    public boolean inverseCanDrop(Statistics<Integer> statistics) {
-      return statistics.getMin() == 8 && statistics.getMax() == 8;
-    }
-  }
-
-  @Test
-  public void testUdp() {
-    FilterPredicate pred = userDefined(intColumn, SevensAndEightsUdp.class);
-    FilterPredicate invPred = LogicalInverseRewriter.rewrite(not(userDefined(intColumn, SevensAndEightsUdp.class)));
-
-    IntStatistics seven = new IntStatistics();
-    seven.setMinMax(7, 7);
-
-    IntStatistics eight = new IntStatistics();
-    eight.setMinMax(8, 8);
-
-    IntStatistics neither = new IntStatistics();
-    neither.setMinMax(1 , 2);
-
-    assertTrue(canDrop(pred, Arrays.asList(
-        getIntColumnMeta(seven, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-    assertFalse(canDrop(pred, Arrays.asList(
-        getIntColumnMeta(eight, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-    assertFalse(canDrop(pred, Arrays.asList(
-        getIntColumnMeta(neither, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-    assertFalse(canDrop(invPred, Arrays.asList(
-        getIntColumnMeta(seven, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-    assertTrue(canDrop(invPred, Arrays.asList(
-        getIntColumnMeta(eight, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-
-    assertFalse(canDrop(invPred, Arrays.asList(
-        getIntColumnMeta(neither, 177L),
-        getDoubleColumnMeta(doubleStats, 177L))));
-  }
-
-  @Test
-  public void testClearExceptionForNots() {
-    List<ColumnChunkMetaData> columnMetas = Arrays.asList(
-        getDoubleColumnMeta(new DoubleStatistics(), 0L),
-        getIntColumnMeta(new IntStatistics(), 0L));
-
-    FilterPredicate pred = and(not(eq(doubleColumn, 12.0)), eq(intColumn, 17));
-
-    try {
-      canDrop(pred, columnMetas);
-      fail("This should throw");
-    } catch (IllegalArgumentException e) {
-      assertEquals("This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter?"
-          + " not(eq(double.column, 12.0))", e.getMessage());
-    }
-  }
-
-  @Test
-  public void testMissingColumn() {
-    List<ColumnChunkMetaData> columnMetas = Arrays.asList(getIntColumnMeta(new IntStatistics(), 0L));
-    try {
-      canDrop(and(eq(doubleColumn, 12.0), eq(intColumn, 17)), columnMetas);
-      fail("This should throw");
-    } catch (IllegalArgumentException e) {
-      assertEquals("Column double.column not found in schema!", e.getMessage());
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
deleted file mode 100644
index d9a4d6c..0000000
--- a/parquet-hadoop/src/test/java/parquet/format/converter/TestParquetMetadataConverter.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.format.converter;
-
-import static java.util.Collections.emptyList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static parquet.format.CompressionCodec.UNCOMPRESSED;
-import static parquet.format.Type.INT32;
-import static parquet.format.Util.readPageHeader;
-import static parquet.format.Util.writePageHeader;
-import static parquet.format.converter.ParquetMetadataConverter.filterFileMetaData;
-import static parquet.format.converter.ParquetMetadataConverter.getOffset;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import parquet.column.Encoding;
-import parquet.example.Paper;
-import parquet.format.ColumnChunk;
-import parquet.format.ColumnMetaData;
-import parquet.format.ConvertedType;
-import parquet.format.FieldRepetitionType;
-import parquet.format.FileMetaData;
-import parquet.format.PageHeader;
-import parquet.format.PageType;
-import parquet.format.RowGroup;
-import parquet.format.SchemaElement;
-import parquet.format.Type;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.Repetition;
-import parquet.schema.Types;
-
-import com.google.common.collect.Lists;
-
-public class TestParquetMetadataConverter {
-
-  @Test
-  public void testPageHeader() throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    PageType type = PageType.DATA_PAGE;
-    int compSize = 10;
-    int uncSize = 20;
-    PageHeader pageHeader = new PageHeader(type, uncSize, compSize);
-    writePageHeader(pageHeader, out);
-    PageHeader readPageHeader = readPageHeader(new ByteArrayInputStream(out.toByteArray()));
-    assertEquals(pageHeader, readPageHeader);
-  }
-
-  @Test
-  public void testSchemaConverter() {
-    ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-    List<SchemaElement> parquetSchema = parquetMetadataConverter.toParquetSchema(Paper.schema);
-    MessageType schema = parquetMetadataConverter.fromParquetSchema(parquetSchema);
-    assertEquals(Paper.schema, schema);
-  }
-
-  @Test
-  public void testSchemaConverterDecimal() {
-    ParquetMetadataConverter converter = new ParquetMetadataConverter();
-    List<SchemaElement> schemaElements = converter.toParquetSchema(
-        Types.buildMessage()
-            .required(PrimitiveTypeName.BINARY)
-                .as(OriginalType.DECIMAL).precision(9).scale(2)
-                .named("aBinaryDecimal")
-            .optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(4)
-                .as(OriginalType.DECIMAL).precision(9).scale(2)
-                .named("aFixedDecimal")
-            .named("Message")
-    );
-    List<SchemaElement> expected = Lists.newArrayList(
-        new SchemaElement("Message").setNum_children(2),
-        new SchemaElement("aBinaryDecimal")
-            .setRepetition_type(FieldRepetitionType.REQUIRED)
-            .setType(Type.BYTE_ARRAY)
-            .setConverted_type(ConvertedType.DECIMAL)
-            .setPrecision(9).setScale(2),
-        new SchemaElement("aFixedDecimal")
-            .setRepetition_type(FieldRepetitionType.OPTIONAL)
-            .setType(Type.FIXED_LEN_BYTE_ARRAY)
-            .setType_length(4)
-            .setConverted_type(ConvertedType.DECIMAL)
-            .setPrecision(9).setScale(2)
-    );
-    Assert.assertEquals(expected, schemaElements);
-  }
-
-  @Test
-  public void testEnumEquivalence() {
-    ParquetMetadataConverter c = new ParquetMetadataConverter();
-    for (Encoding encoding : Encoding.values()) {
-      assertEquals(encoding, c.getEncoding(c.getEncoding(encoding)));
-    }
-    for (parquet.format.Encoding encoding : parquet.format.Encoding.values()) {
-      assertEquals(encoding, c.getEncoding(c.getEncoding(encoding)));
-    }
-    for (Repetition repetition : Repetition.values()) {
-      assertEquals(repetition, c.fromParquetRepetition(c.toParquetRepetition(repetition)));
-    }
-    for (FieldRepetitionType repetition : FieldRepetitionType.values()) {
-      assertEquals(repetition, c.toParquetRepetition(c.fromParquetRepetition(repetition)));
-    }
-    for (PrimitiveTypeName primitiveTypeName : PrimitiveTypeName.values()) {
-      assertEquals(primitiveTypeName, c.getPrimitive(c.getType(primitiveTypeName)));
-    }
-    for (Type type : Type.values()) {
-      assertEquals(type, c.getType(c.getPrimitive(type)));
-    }
-    for (OriginalType original : OriginalType.values()) {
-      assertEquals(original, c.getOriginalType(c.getConvertedType(original)));
-    }
-    for (ConvertedType converted : ConvertedType.values()) {
-      assertEquals(converted, c.getConvertedType(c.getOriginalType(converted)));
-    }
-  }
-
-  private FileMetaData metadata(long... sizes) {
-    List<SchemaElement> schema = emptyList();
-    List<RowGroup> rowGroups = new ArrayList<RowGroup>();
-    long offset = 0;
-    for (long size : sizes) {
-      ColumnChunk columnChunk = new ColumnChunk(offset);
-      columnChunk.setMeta_data(new ColumnMetaData(
-          INT32,
-          Collections.<parquet.format.Encoding>emptyList(),
-          Collections.<String>emptyList(),
-          UNCOMPRESSED, 10l, size * 2, size, offset));
-      rowGroups.add(new RowGroup(Arrays.asList(columnChunk), size, 1));
-      offset += size;
-    }
-    return new FileMetaData(1, schema, sizes.length, rowGroups);
-  }
-
-  private FileMetaData filter(FileMetaData md, long start, long end) {
-    return filterFileMetaData(new FileMetaData(md), new ParquetMetadataConverter.RangeMetadataFilter(start, end));
-  }
-
-  private void verifyMD(FileMetaData md, long... offsets) {
-    assertEquals(offsets.length, md.row_groups.size());
-    for (int i = 0; i < offsets.length; i++) {
-      long offset = offsets[i];
-      RowGroup rowGroup = md.getRow_groups().get(i);
-      assertEquals(offset, getOffset(rowGroup));
-    }
-  }
-
-  /**
-   * verifies that splits will end up being a partition of the rowgroup
-   * they are all found only once
-   * @param md
-   * @param splitWidth
-   */
-  private void verifyAllFilters(FileMetaData md, long splitWidth) {
-    Set<Long> offsetsFound = new TreeSet<Long>();
-    for (long start = 0; start < fileSize(md); start += splitWidth) {
-      FileMetaData filtered = filter(md, start, start + splitWidth);
-      for (RowGroup rg : filtered.getRow_groups()) {
-        long o = getOffset(rg);
-        if (offsetsFound.contains(o)) {
-          fail("found the offset twice: " + o);
-        } else {
-          offsetsFound.add(o);
-        }
-      }
-    }
-    if (offsetsFound.size() != md.row_groups.size()) {
-      fail("missing row groups, "
-          + "found: " + offsetsFound
-          + "\nexpected " + md.getRow_groups());
-    }
-  }
-
-  private long fileSize(FileMetaData md) {
-    long size = 0;
-    for (RowGroup rg : md.getRow_groups()) {
-      size += rg.total_byte_size;
-    }
-    return size;
-  }
-
-  @Test
-  public void testFilterMetaData() {
-    verifyMD(filter(metadata(50, 50, 50), 0, 50), 0);
-    verifyMD(filter(metadata(50, 50, 50), 50, 100), 50);
-    verifyMD(filter(metadata(50, 50, 50), 100, 150), 100);
-    // picks up first RG
-    verifyMD(filter(metadata(50, 50, 50), 25, 75), 0);
-    // picks up no RG
-    verifyMD(filter(metadata(50, 50, 50), 26, 75));
-    // picks up second RG
-    verifyMD(filter(metadata(50, 50, 50), 26, 76), 50);
-
-    verifyAllFilters(metadata(50, 50, 50), 10);
-    verifyAllFilters(metadata(50, 50, 50), 51);
-    verifyAllFilters(metadata(50, 50, 50), 25); // corner cases are in the middle
-    verifyAllFilters(metadata(50, 50, 50), 24);
-    verifyAllFilters(metadata(50, 50, 50), 26);
-    verifyAllFilters(metadata(50, 50, 50), 110);
-    verifyAllFilters(metadata(10, 50, 500), 110);
-    verifyAllFilters(metadata(10, 50, 500), 10);
-    verifyAllFilters(metadata(10, 50, 500), 600);
-    verifyAllFilters(metadata(11, 9, 10), 10);
-    verifyAllFilters(metadata(11, 9, 10), 9);
-    verifyAllFilters(metadata(11, 9, 10), 8);
-  }
-
-  @Test
-  public void randomTestFilterMetaData() {
-    // randomized property based testing
-    // if it fails add the case above
-    Random random = new Random(System.currentTimeMillis());
-    for (int j = 0; j < 100; j++) {
-      long[] rgs = new long[random.nextInt(50)];
-      for (int i = 0; i < rgs.length; i++) {
-        rgs[i] = random.nextInt(10000) + 1; // No empty row groups
-      }
-      int splitSize = random.nextInt(10000);
-      try {
-        verifyAllFilters(metadata(rgs), splitSize);
-      } catch (AssertionError e) {
-	  throw (AssertionError) new AssertionError("fail verifyAllFilters(metadata(" + Arrays.toString(rgs) + "), " + splitSize + ")").initCause(e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
deleted file mode 100644
index 682501d..0000000
--- a/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedInputFormatTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.junit.Before;
-import org.junit.Test;
-import parquet.example.data.Group;
-import parquet.example.data.simple.SimpleGroupFactory;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.example.ExampleOutputFormat;
-import parquet.hadoop.example.GroupReadSupport;
-import parquet.hadoop.example.GroupWriteSupport;
-import parquet.hadoop.mapred.Container;
-import parquet.hadoop.mapred.DeprecatedParquetInputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ContextUtil;
-import parquet.schema.MessageTypeParser;
-
-import java.io.IOException;
-
-import static java.lang.Thread.sleep;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with
- * different parameters than ParquetInputFormat
- * @author Tianshuo Deng
- */
-public class DeprecatedInputFormatTest {
-  final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
-  final Path inputPath = new Path("src/test/java/parquet/hadoop/example/TestInputOutputFormat.java");
-  final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
-  Job writeJob;
-  JobConf jobConf;
-  RunningJob mapRedJob;
-  private String writeSchema;
-  private String readSchema;
-  private Configuration conf;
-
-  @Before
-  public void setUp() {
-    conf = new Configuration();
-    jobConf = new JobConf();
-    writeSchema = "message example {\n" +
-            "required int32 line;\n" +
-            "required binary content;\n" +
-            "}";
-
-    readSchema = "message example {\n" +
-            "required int32 line;\n" +
-            "required binary content;\n" +
-            "}";
-  }
-
-  private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
-
-    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
-    fileSystem.delete(parquetPath, true);
-    fileSystem.delete(outputPath, true);
-    {
-      writeJob = new Job(conf, "write");
-      TextInputFormat.addInputPath(writeJob, inputPath);
-      writeJob.setInputFormatClass(TextInputFormat.class);
-      writeJob.setNumReduceTasks(0);
-      ExampleOutputFormat.setCompression(writeJob, codec);
-      ExampleOutputFormat.setOutputPath(writeJob, parquetPath);
-      writeJob.setOutputFormatClass(ExampleOutputFormat.class);
-      writeJob.setMapperClass(ReadMapper.class);
-      ExampleOutputFormat.setSchema(
-              writeJob,
-              MessageTypeParser.parseMessageType(
-                      writeSchema));
-      writeJob.submit();
-      waitForJob(writeJob);
-    }
-    {
-      jobConf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);
-      jobConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, GroupReadSupport.class.getCanonicalName());
-      jobConf.setInputFormat(MyDeprecatedInputFormat.class);
-      MyDeprecatedInputFormat.setInputPaths(jobConf, parquetPath);
-      jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
-      org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(jobConf, outputPath);
-      jobConf.setMapperClass(DeprecatedWriteMapper.class);
-      jobConf.setNumReduceTasks(0);
-      mapRedJob = JobClient.runJob(jobConf);
-    }
-  }
-
-  @Test
-  public void testReadWriteWithCountDeprecated() throws Exception {
-    runMapReduceJob(CompressionCodecName.GZIP);
-    assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue() > 0L);
-    assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue() > 0L);
-    assertTrue(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue()
-            == mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue());
-    //not testing the time read counter since it could be zero due to the size of data is too small
-  }
-
-  @Test
-  public void testReadWriteWithoutCounter() throws Exception {
-    jobConf.set("parquet.benchmark.time.read", "false");
-    jobConf.set("parquet.benchmark.bytes.total", "false");
-    jobConf.set("parquet.benchmark.bytes.read", "false");
-    runMapReduceJob(CompressionCodecName.GZIP);
-    assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytesread").getValue(), 0L);
-    assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("bytestotal").getValue(), 0L);
-    assertEquals(mapRedJob.getCounters().getGroup("parquet").getCounterForName("timeread").getValue(), 0L);
-  }
-
-  private void waitForJob(Job job) throws InterruptedException, IOException {
-    while (!job.isComplete()) {
-      System.out.println("waiting for job " + job.getJobName());
-      sleep(100);
-    }
-    System.out.println("status for job " + job.getJobName() + ": " + (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
-    if (!job.isSuccessful()) {
-      throw new RuntimeException("job failed " + job.getJobName());
-    }
-  }
-
-  public static class ReadMapper extends Mapper<LongWritable, Text, Void, Group> {
-    private SimpleGroupFactory factory;
-
-    protected void setup(Context context) throws IOException, InterruptedException {
-      factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(ContextUtil.getConfiguration(context)));
-    }
-
-    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-      Group group = factory.newGroup()
-              .append("line", (int) key.get())
-              .append("content", value.toString());
-      context.write(null, group);
-    }
-  }
-
-  public static class DeprecatedWriteMapper implements org.apache.hadoop.mapred.Mapper<Void, Container<Group>, LongWritable, Text> {
-
-    @Override
-    public void map(Void aVoid, Container<Group> valueContainer, OutputCollector<LongWritable, Text> longWritableTextOutputCollector, Reporter reporter) throws IOException {
-      Group value = valueContainer.get();
-      longWritableTextOutputCollector.collect(new LongWritable(value.getInteger("line", 0)), new Text(value.getString("content", 0)));
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public void configure(JobConf entries) {
-    }
-  }
-
-  static class MyDeprecatedInputFormat extends DeprecatedParquetInputFormat<Group> {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedOutputFormatTest.java b/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedOutputFormatTest.java
deleted file mode 100644
index b09ba28..0000000
--- a/parquet-hadoop/src/test/java/parquet/hadoop/DeprecatedOutputFormatTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.*;
-import org.junit.Before;
-import org.junit.Test;
-
-import parquet.example.data.Group;
-import parquet.example.data.simple.SimpleGroupFactory;
-import parquet.hadoop.example.GroupWriteSupport;
-import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageTypeParser;
-
-import java.io.IOException;
-
-/**
- * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with
- * different parameters than ParquetInputFormat
- * @author Tianshuo Deng
- */
-public class DeprecatedOutputFormatTest {
-  final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet");
-  final Path inputPath = new Path("src/test/java/parquet/hadoop/example/TestInputOutputFormat.java");
-  final Path outputPath = new Path("target/test/example/TestInputOutputFormat/out");
-  JobConf jobConf;
-  RunningJob mapRedJob;
-  private String writeSchema;
-  private Configuration conf;
-
-  @Before
-  public void setUp() {
-    conf = new Configuration();
-    jobConf = new JobConf();
-    writeSchema = "message example {\n" +
-            "required int32 line;\n" +
-            "required binary content;\n" +
-            "}";
-  }
-
-  private void runMapReduceJob(CompressionCodecName codec) throws IOException, ClassNotFoundException, InterruptedException {
-
-    final FileSystem fileSystem = parquetPath.getFileSystem(conf);
-    fileSystem.delete(parquetPath, true);
-    fileSystem.delete(outputPath, true);
-    {
-      jobConf.setInputFormat(TextInputFormat.class);
-      TextInputFormat.addInputPath(jobConf, inputPath);
-      jobConf.setNumReduceTasks(0);
-
-      jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
-      DeprecatedParquetOutputFormat.setCompression(jobConf, codec);
-      DeprecatedParquetOutputFormat.setOutputPath(jobConf, parquetPath);
-      DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, GroupWriteSupport.class);
-      GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), jobConf);
-
-      jobConf.setMapperClass(DeprecatedMapper.class);
-      mapRedJob = JobClient.runJob(jobConf);
-    }
-  }
-
-  @Test
-  public void testReadWrite() throws Exception {
-    runMapReduceJob(CompressionCodecName.GZIP);
-    assert(mapRedJob.isSuccessful());
-  }
-
-  public static class DeprecatedMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, Text, Void, Group> {
-    private SimpleGroupFactory factory;
-
-    public void configure(JobConf job) {
-      factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(job));
-    }
-
-    @Override
-    public void map(LongWritable key, Text value, OutputCollector<Void, Group> outputCollector, Reporter reporter) throws IOException {
-      Group group = factory.newGroup()
-              .append("line", (int) key.get())
-              .append("content", value.toString());
-      outputCollector.collect(null, group);
-    }
-
-    @Override
-    public void close() {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
deleted file mode 100644
index 28f6be2..0000000
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.inOrder;
-import static parquet.column.Encoding.PLAIN;
-import static parquet.column.Encoding.RLE;
-import static parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
-import static parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
-import static parquet.schema.OriginalType.UTF8;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-
-import parquet.bytes.BytesInput;
-import parquet.bytes.LittleEndianDataInputStream;
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.BinaryStatistics;
-import parquet.column.statistics.Statistics;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-import parquet.schema.Types;
-
-public class TestColumnChunkPageWriteStore {
-
-  private int pageSize = 1024;
-  private int initialSize = 1024;
-  private Configuration conf;
-
-  @Before
-  public void initConfiguration() {
-    this.conf = new Configuration();
-  }
-
-  @Test
-  public void test() throws Exception {
-    Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
-    Path root = file.getParent();
-    FileSystem fs = file.getFileSystem(conf);
-    if (fs.exists(root)) {
-      fs.delete(root, true);
-    }
-    fs.mkdirs(root);
-    MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
-    ColumnDescriptor col = schema.getColumns().get(0);
-    Encoding dataEncoding = PLAIN;
-    int valueCount = 10;
-    int d = 1;
-    int r = 2;
-    int v = 3;
-    BytesInput definitionLevels = BytesInput.fromInt(d);
-    BytesInput repetitionLevels = BytesInput.fromInt(r);
-    Statistics<?> statistics = new BinaryStatistics();
-    BytesInput data = BytesInput.fromInt(v);
-    int rowCount = 5;
-    int nullCount = 1;
-
-    {
-      ParquetFileWriter writer = new ParquetFileWriter(conf, schema, file);
-      writer.start();
-      writer.startBlock(rowCount);
-      {
-        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , initialSize);
-        PageWriter pageWriter = store.getPageWriter(col);
-        pageWriter.writePageV2(
-            rowCount, nullCount, valueCount,
-            repetitionLevels, definitionLevels,
-            dataEncoding, data,
-            statistics);
-        store.flushToFileWriter(writer);
-      }
-      writer.endBlock();
-      writer.end(new HashMap<String, String>());
-    }
-
-    {
-      ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
-      ParquetFileReader reader = new ParquetFileReader(conf, file, footer.getBlocks(), schema.getColumns());
-      PageReadStore rowGroup = reader.readNextRowGroup();
-      PageReader pageReader = rowGroup.getPageReader(col);
-      DataPageV2 page = (DataPageV2)pageReader.readPage();
-      assertEquals(rowCount, page.getRowCount());
-      assertEquals(nullCount, page.getNullCount());
-      assertEquals(valueCount, page.getValueCount());
-      assertEquals(d, intValue(page.getDefinitionLevels()));
-      assertEquals(r, intValue(page.getRepetitionLevels()));
-      assertEquals(dataEncoding, page.getDataEncoding());
-      assertEquals(v, intValue(page.getData()));
-      assertEquals(statistics.toString(), page.getStatistics().toString());
-      reader.close();
-    }
-  }
-
-  private int intValue(BytesInput in) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    in.writeAllTo(baos);
-    LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    int i = os.readInt();
-    os.close();
-    return i;
-  }
-
-  @Test
-  public void testColumnOrderV1() throws IOException {
-    ParquetFileWriter mockFileWriter = Mockito.mock(ParquetFileWriter.class);
-    InOrder inOrder = inOrder(mockFileWriter);
-    MessageType schema = Types.buildMessage()
-        .required(BINARY).as(UTF8).named("a_string")
-        .required(INT32).named("an_int")
-        .required(INT64).named("a_long")
-        .required(FLOAT).named("a_float")
-        .required(DOUBLE).named("a_double")
-        .named("order_test");
-
-    BytesInput fakeData = BytesInput.fromInt(34);
-    int fakeCount = 3;
-    BinaryStatistics fakeStats = new BinaryStatistics();
-
-    ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
-        compressor(UNCOMPRESSED), schema, initialSize);
-
-    for (ColumnDescriptor col : schema.getColumns()) {
-      PageWriter pageWriter = store.getPageWriter(col);
-      pageWriter.writePage(fakeData, fakeCount, fakeStats, RLE, RLE, PLAIN);
-    }
-
-    // flush to the mock writer
-    store.flushToFileWriter(mockFileWriter);
-
-    for (ColumnDescriptor col : schema.getColumns()) {
-      inOrder.verify(mockFileWriter).startColumn(
-          eq(col), eq((long) fakeCount), eq(UNCOMPRESSED));
-    }
-  }
-
-  private CodecFactory.BytesCompressor compressor(CompressionCodecName codec) {
-    return new CodecFactory(conf).getCompressor(codec, pageSize);
-  }
-}