You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/22 22:10:44 UTC

[7/9] flink git commit: [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
deleted file mode 100644
index 0c9c549..0000000
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import static org.apache.flink.orc.OrcUtils.fillRows;
-
-/**
- * InputFormat to read ORC data.
- * For Optimization, reading is done in batch instead of a single row.
- */
-public class RowOrcInputFormat
-	extends FileInputFormat<Row>
-	implements ResultTypeQueryable<Row> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(RowOrcInputFormat.class);
-	private static final int BATCH_SIZE = 1024;
-
-	private org.apache.hadoop.conf.Configuration config;
-	private TypeDescription schema;
-	private int[] fieldMapping;
-
-	private transient RowTypeInfo rowType;
-	private transient RecordReader orcRowsReader;
-	private transient VectorizedRowBatch rowBatch;
-	private transient Row[] rows;
-
-	private transient int rowInBatch;
-
-	public RowOrcInputFormat(String path, String schemaString, Configuration orcConfig) {
-		this(path, TypeDescription.fromString(schemaString), orcConfig);
-	}
-
-	public RowOrcInputFormat(String path, TypeDescription orcSchema, Configuration orcConfig) {
-		super(new Path(path));
-		this.unsplittable = false;
-		this.schema = orcSchema;
-		this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
-		this.config = orcConfig;
-
-		this.fieldMapping = new int[this.schema.getChildren().size()];
-		for (int i = 0; i < fieldMapping.length; i++) {
-			this.fieldMapping[i] = i;
-		}
-
-	}
-
-	public void setFieldMapping(int[] fieldMapping) {
-		this.fieldMapping = fieldMapping;
-		// adapt result type
-
-		TypeInformation[] fieldTypes = new TypeInformation[fieldMapping.length];
-		String[] fieldNames = new String[fieldMapping.length];
-		for (int i = 0; i < fieldMapping.length; i++) {
-			fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]);
-			fieldNames[i] = this.rowType.getFieldNames()[fieldMapping[i]];
-		}
-		this.rowType = new RowTypeInfo(fieldTypes, fieldNames);
-	}
-
-	private boolean[] computeProjectionMask() {
-		boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
-		for (int inIdx : fieldMapping) {
-			TypeDescription fieldSchema = schema.getChildren().get(inIdx);
-			for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
-				projectionMask[i] = true;
-			}
-		}
-		return projectionMask;
-	}
-
-	@Override
-	public void openInputFormat() throws IOException {
-		super.openInputFormat();
-		this.rows = new Row[BATCH_SIZE];
-		for (int i = 0; i < BATCH_SIZE; i++) {
-			rows[i] = new Row(fieldMapping.length);
-		}
-	}
-
-	@Override
-	public void open(FileInputSplit fileSplit) throws IOException {
-
-		this.currentSplit = fileSplit;
-		Preconditions.checkArgument(this.splitStart == 0, "ORC files must be read from the start.");
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Opening ORC file " + fileSplit.getPath());
-		}
-
-		org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(fileSplit.getPath().getPath());
-
-		Reader orcReader = OrcFile.createReader(hPath, OrcFile.readerOptions(config));
-
-		Reader.Options options = orcReader.options()
-			.range(fileSplit.getStart(), fileSplit.getLength())
-			.useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(config))
-			.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(config))
-			.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(config));
-
-		options.include(computeProjectionMask());
-
-		// check that schema of file is as expected
-		if (!this.schema.equals(orcReader.getSchema())) {
-
-			throw new RuntimeException("Invalid schema for file at " + this.filePath +
-				" Expected:" + this.schema + " Actual: " + orcReader.getSchema());
-		}
-
-		this.orcRowsReader = orcReader.rows(options);
-
-		// assign ids
-		this.schema.getId();
-
-		this.rowBatch = schema.createRowBatch(BATCH_SIZE);
-		rowInBatch = 0;
-	}
-
-	@Override
-	public void close() throws IOException {
-
-		if (orcRowsReader != null) {
-			this.orcRowsReader.close();
-		}
-		this.orcRowsReader = null;
-
-	}
-
-	@Override
-	public void closeInputFormat() throws IOException {
-		this.rows = null;
-		this.rows = null;
-		this.schema = null;
-		this.rowBatch = null;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !ensureBatch();
-	}
-
-	private boolean ensureBatch() throws IOException {
-
-		if (rowInBatch >= rowBatch.size) {
-			rowInBatch = 0;
-			boolean moreRows = orcRowsReader.nextBatch(rowBatch);
-
-			if (moreRows) {
-				// read rows
-				fillRows(rows, schema, rowBatch, fieldMapping);
-			}
-			return moreRows;
-		}
-
-		return true;
-	}
-
-	@Override
-	public Row nextRecord(Row reuse) throws IOException {
-		return rows[this.rowInBatch++];
-	}
-
-	@Override
-	public TypeInformation<Row> getProducedType() {
-		return rowType;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		this.config.write(out);
-		out.writeUTF(schema.toString());
-
-		out.writeInt(fieldMapping.length);
-		for (int f : fieldMapping) {
-			out.writeInt(f);
-		}
-
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-
-		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
-		configuration.readFields(in);
-
-		if (this.config == null) {
-			this.config = configuration;
-		}
-		this.schema = TypeDescription.fromString(in.readUTF());
-
-		this.fieldMapping = new int[in.readInt()];
-		for (int i = 0; i < fieldMapping.length; i++) {
-			this.fieldMapping[i] = in.readInt();
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
new file mode 100644
index 0000000..0efe41f
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
@@ -0,0 +1,795 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.Reader;
+import org.apache.orc.StripeInformation;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link OrcRowInputFormat}.
+ *
+ */
+public class OrcRowInputFormatTest {
+
+	private OrcRowInputFormat rowOrcInputFormat;
+
+	@After
+	public void tearDown() throws IOException {
+		if (rowOrcInputFormat != null) {
+			rowOrcInputFormat.close();
+			rowOrcInputFormat.closeInputFormat();
+		}
+		rowOrcInputFormat = null;
+	}
+
+	private static final String TEST_FILE_FLAT = "test-data-flat.orc";
+	private static final String TEST_SCHEMA_FLAT =
+		"struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>";
+
+	private static final String TEST_FILE_NESTED = "test-data-nested.orc";
+	private static final String TEST_SCHEMA_NESTED =
+		"struct<" +
+			"boolean1:boolean," +
+			"byte1:tinyint," +
+			"short1:smallint," +
+			"int1:int," +
+			"long1:bigint," +
+			"float1:float," +
+			"double1:double," +
+			"bytes1:binary," +
+			"string1:string," +
+			"middle:struct<" +
+				"list:array<" +
+					"struct<" +
+						"int1:int," +
+						"string1:string" +
+					">" +
+				">" +
+			">," +
+			"list:array<" +
+				"struct<" +
+					"int1:int," +
+					"string1:string" +
+				">" +
+			">," +
+			"map:map<" +
+				"string," +
+				"struct<" +
+					"int1:int," +
+					"string1:string" +
+				">" +
+			">" +
+		">";
+
+	private static final String TEST_FILE_TIMETYPES = "test-data-timetypes.orc";
+	private static final String TEST_SCHEMA_TIMETYPES = "struct<time:timestamp,date:date>";
+
+	private static final String TEST_FILE_DECIMAL = "test-data-decimal.orc";
+	private static final String TEST_SCHEMA_DECIMAL = "struct<_col0:decimal(10,5)>";
+
+	private static final String TEST_FILE_NESTEDLIST = "test-data-nestedlist.orc";
+	private static final String TEST_SCHEMA_NESTEDLIST = "struct<mylist1:array<array<struct<mylong1:bigint>>>>";
+
+	@Test(expected = FileNotFoundException.class)
+	public void testInvalidPath() throws IOException{
+		rowOrcInputFormat =
+			new OrcRowInputFormat("/does/not/exist", TEST_SCHEMA_FLAT, new Configuration());
+		rowOrcInputFormat.openInputFormat();
+		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
+		rowOrcInputFormat.open(inputSplits[0]);
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidProjection1() throws IOException{
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+		int[] projectionMask = {1, 2, 3, -1};
+		rowOrcInputFormat.selectFields(projectionMask);
+	}
+
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testInvalidProjection2() throws IOException{
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+		int[] projectionMask = {1, 2, 3, 9};
+		rowOrcInputFormat.selectFields(projectionMask);
+	}
+
+	@Test
+	public void testProjectionMaskNested() throws IOException{
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+		OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+		// mock options to check configuration of ORC reader
+		Reader.Options options = new Reader.Options();
+		doReturn(options).when(spy).getOptions(any());
+
+		spy.selectFields(9, 11, 2);
+		spy.openInputFormat();
+		FileInputSplit[] splits = spy.createInputSplits(1);
+		spy.open(splits[0]);
+
+		// top-level struct is false
+		boolean[] expected = new boolean[]{
+			false, // top level
+			false, false, // flat fields 0, 1 are out
+			true, // flat field 2 is in
+			false, false, false, false, false, false, // flat fields 3, 4, 5, 6, 7, 8 are out
+			true, true, true, true, true, // nested field 9 is in
+			false, false, false, false, // nested field 10 is out
+			true, true, true, true, true}; // nested field 11 is in
+		assertArrayEquals(expected, options.getInclude());
+	}
+
+	@Test
+	public void testSplitStripesGivenSplits() throws IOException {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+
+		OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+		// mock options to check configuration of ORC reader
+		Reader.Options options = spy(new Reader.Options());
+		doReturn(options).when(spy).getOptions(any());
+
+		FileInputSplit[] splits = spy.createInputSplits(3);
+
+		spy.openInputFormat();
+		spy.open(splits[0]);
+		verify(options).range(eq(3L), eq(137005L));
+		spy.open(splits[1]);
+		verify(options).range(eq(137008L), eq(136182L));
+		spy.open(splits[2]);
+		verify(options).range(eq(273190L), eq(123633L));
+	}
+
+	@Test
+	public void testSplitStripesCustomSplits() throws IOException {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+
+		OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+		// mock list of stripes
+		List<StripeInformation> stripes = new ArrayList<>();
+		StripeInformation stripe1 = mock(StripeInformation.class);
+		when(stripe1.getOffset()).thenReturn(10L);
+		when(stripe1.getLength()).thenReturn(90L);
+		StripeInformation stripe2 = mock(StripeInformation.class);
+		when(stripe2.getOffset()).thenReturn(100L);
+		when(stripe2.getLength()).thenReturn(100L);
+		StripeInformation stripe3 = mock(StripeInformation.class);
+		when(stripe3.getOffset()).thenReturn(200L);
+		when(stripe3.getLength()).thenReturn(100L);
+		StripeInformation stripe4 = mock(StripeInformation.class);
+		when(stripe4.getOffset()).thenReturn(300L);
+		when(stripe4.getLength()).thenReturn(100L);
+		StripeInformation stripe5 = mock(StripeInformation.class);
+		when(stripe5.getOffset()).thenReturn(400L);
+		when(stripe5.getLength()).thenReturn(100L);
+		stripes.add(stripe1);
+		stripes.add(stripe2);
+		stripes.add(stripe3);
+		stripes.add(stripe4);
+		stripes.add(stripe5);
+		doReturn(stripes).when(spy).getStripes(any());
+
+		// mock options to check configuration of ORC reader
+		Reader.Options options = spy(new Reader.Options());
+		doReturn(options).when(spy).getOptions(any());
+
+		spy.openInputFormat();
+		// split ranging 2 stripes
+		spy.open(new FileInputSplit(0, new Path(getPath(TEST_FILE_FLAT)), 0, 150, new String[]{}));
+		verify(options).range(eq(10L), eq(190L));
+		// split ranging 0 stripes
+		spy.open(new FileInputSplit(1, new Path(getPath(TEST_FILE_FLAT)), 150, 10, new String[]{}));
+		verify(options).range(eq(0L), eq(0L));
+		// split ranging 1 stripe
+		spy.open(new FileInputSplit(2, new Path(getPath(TEST_FILE_FLAT)), 160, 41, new String[]{}));
+		verify(options).range(eq(200L), eq(100L));
+		// split ranging 2 stripe
+		spy.open(new FileInputSplit(3, new Path(getPath(TEST_FILE_FLAT)), 201, 299, new String[]{}));
+		verify(options).range(eq(300L), eq(200L));
+	}
+
+	@Test
+	public void testProducedType() throws IOException {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+		assertTrue(rowOrcInputFormat.getProducedType() instanceof RowTypeInfo);
+		RowTypeInfo producedType = (RowTypeInfo) rowOrcInputFormat.getProducedType();
+
+		assertArrayEquals(
+			new TypeInformation[]{
+				// primitives
+				Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+				// binary
+				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+				// string
+				Types.STRING,
+				// struct
+				Types.ROW_NAMED(
+					new String[]{"list"},
+					ObjectArrayTypeInfo.getInfoFor(
+						Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))),
+				// list
+				ObjectArrayTypeInfo.getInfoFor(
+					Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING)),
+				// map
+				new MapTypeInfo<>(Types.STRING, Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))
+			},
+			producedType.getFieldTypes());
+		assertArrayEquals(
+			new String[]{"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", "bytes1", "string1", "middle", "list", "map"},
+			producedType.getFieldNames());
+	}
+
+	@Test
+	public void testProducedTypeWithProjection() throws IOException {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+		rowOrcInputFormat.selectFields(9, 3, 7, 10);
+
+		assertTrue(rowOrcInputFormat.getProducedType() instanceof RowTypeInfo);
+		RowTypeInfo producedType = (RowTypeInfo) rowOrcInputFormat.getProducedType();
+
+		assertArrayEquals(
+			new TypeInformation[]{
+				// struct
+				Types.ROW_NAMED(
+					new String[]{"list"},
+					ObjectArrayTypeInfo.getInfoFor(
+						Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))),
+				// int
+				Types.INT,
+				// binary
+				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+				// list
+				ObjectArrayTypeInfo.getInfoFor(
+					Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))
+			},
+			producedType.getFieldTypes());
+		assertArrayEquals(
+			new String[]{"middle", "int1", "bytes1", "list"},
+			producedType.getFieldNames());
+	}
+
+	@Test
+	public void testSerialization() throws Exception {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+
+		rowOrcInputFormat.selectFields(0, 4, 1);
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Equals("_col1", PredicateLeaf.Type.STRING, "M"));
+
+		byte[] bytes = InstantiationUtil.serializeObject(rowOrcInputFormat);
+		OrcRowInputFormat copy = InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader());
+
+		FileInputSplit[] splits = copy.createInputSplits(1);
+		copy.openInputFormat();
+		copy.open(splits[0]);
+		assertFalse(copy.reachedEnd());
+		Row row = copy.nextRecord(null);
+
+		assertNotNull(row);
+		assertEquals(3, row.getArity());
+		// check first row
+		assertEquals(1, row.getField(0));
+		assertEquals(500, row.getField(1));
+		assertEquals("M", row.getField(2));
+	}
+
+	@Test
+	public void testNumericBooleanStringPredicates() throws Exception {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+		rowOrcInputFormat.selectFields(0, 1, 2, 3, 4, 5, 6, 8);
+
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Equals("boolean1", PredicateLeaf.Type.BOOLEAN, false));
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.LessThan("byte1", PredicateLeaf.Type.LONG, 1));
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.LessThanEquals("short1", PredicateLeaf.Type.LONG, 1024));
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Between("int1", PredicateLeaf.Type.LONG, -1, 65536));
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Equals("long1", PredicateLeaf.Type.LONG, 9223372036854775807L));
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Equals("float1", PredicateLeaf.Type.FLOAT, 1.0));
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Equals("double1", PredicateLeaf.Type.FLOAT, -15.0));
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.IsNull("string1", PredicateLeaf.Type.STRING));
+		// boolean pred
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello"));
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		rowOrcInputFormat.openInputFormat();
+
+		// mock options to check configuration of ORC reader
+		OrcRowInputFormat spy = spy(rowOrcInputFormat);
+		Reader.Options options = new Reader.Options();
+		doReturn(options).when(spy).getOptions(any());
+
+		spy.openInputFormat();
+		spy.open(splits[0]);
+
+		// verify predicate configuration
+		SearchArgument sarg = options.getSearchArgument();
+		assertNotNull(sarg);
+		assertEquals("(and leaf-0 leaf-1 leaf-2 leaf-3 leaf-4 leaf-5 leaf-6 leaf-7 leaf-8)", sarg.getExpression().toString());
+		assertEquals(9, sarg.getLeaves().size());
+		List<PredicateLeaf> leaves = sarg.getLeaves();
+		assertEquals("(EQUALS boolean1 false)", leaves.get(0).toString());
+		assertEquals("(LESS_THAN byte1 1)", leaves.get(1).toString());
+		assertEquals("(LESS_THAN_EQUALS short1 1024)", leaves.get(2).toString());
+		assertEquals("(BETWEEN int1 -1 65536)", leaves.get(3).toString());
+		assertEquals("(EQUALS long1 9223372036854775807)", leaves.get(4).toString());
+		assertEquals("(EQUALS float1 1.0)", leaves.get(5).toString());
+		assertEquals("(EQUALS double1 -15.0)", leaves.get(6).toString());
+		assertEquals("(IS_NULL string1)", leaves.get(7).toString());
+		assertEquals("(EQUALS string1 hello)", leaves.get(8).toString());
+	}
+
+	@Test
+	public void testTimePredicates() throws Exception {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration());
+
+		rowOrcInputFormat.addPredicate(
+			// OR
+			new OrcRowInputFormat.Or(
+				// timestamp pred
+				new OrcRowInputFormat.Equals("time", PredicateLeaf.Type.TIMESTAMP, Timestamp.valueOf("1900-05-05 12:34:56.100")),
+				// date pred
+				new OrcRowInputFormat.Equals("date", PredicateLeaf.Type.DATE, Date.valueOf("1900-12-25")))
+			);
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		rowOrcInputFormat.openInputFormat();
+
+		// mock options to check configuration of ORC reader
+		OrcRowInputFormat spy = spy(rowOrcInputFormat);
+		Reader.Options options = new Reader.Options();
+		doReturn(options).when(spy).getOptions(any());
+
+		spy.openInputFormat();
+		spy.open(splits[0]);
+
+		// verify predicate configuration
+		SearchArgument sarg = options.getSearchArgument();
+		assertNotNull(sarg);
+		assertEquals("(or leaf-0 leaf-1)", sarg.getExpression().toString());
+		assertEquals(2, sarg.getLeaves().size());
+		List<PredicateLeaf> leaves = sarg.getLeaves();
+		assertEquals("(EQUALS time 1900-05-05 12:34:56.1)", leaves.get(0).toString());
+		assertEquals("(EQUALS date 1900-12-25)", leaves.get(1).toString());
+	}
+
+	@Test
+	public void testDecimalPredicate() throws Exception {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration());
+
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Not(
+				// decimal pred
+				new OrcRowInputFormat.Equals("_col0", PredicateLeaf.Type.DECIMAL, BigDecimal.valueOf(-1000.5))));
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		rowOrcInputFormat.openInputFormat();
+
+		// mock options to check configuration of ORC reader
+		OrcRowInputFormat spy = spy(rowOrcInputFormat);
+		Reader.Options options = new Reader.Options();
+		doReturn(options).when(spy).getOptions(any());
+
+		spy.openInputFormat();
+		spy.open(splits[0]);
+
+		// verify predicate configuration
+		SearchArgument sarg = options.getSearchArgument();
+		assertNotNull(sarg);
+		assertEquals("(not leaf-0)", sarg.getExpression().toString());
+		assertEquals(1, sarg.getLeaves().size());
+		List<PredicateLeaf> leaves = sarg.getLeaves();
+		assertEquals("(EQUALS _col0 -1000.5)", leaves.get(0).toString());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testPredicateWithInvalidColumn() throws Exception {
+		rowOrcInputFormat =
+			new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Equals("unknown", PredicateLeaf.Type.LONG, 42));
+	}
+
+	@Test
+	public void testReadNestedFile() throws IOException{
+		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+		Row row = rowOrcInputFormat.nextRecord(null);
+
+		// validate first row
+		assertNotNull(row);
+		assertEquals(12, row.getArity());
+		assertEquals(false, row.getField(0));
+		assertEquals((byte) 1, row.getField(1));
+		assertEquals((short) 1024, row.getField(2));
+		assertEquals(65536, row.getField(3));
+		assertEquals(9223372036854775807L, row.getField(4));
+		assertEquals(1.0f, row.getField(5));
+		assertEquals(-15.0d, row.getField(6));
+		assertArrayEquals(new byte[]{0, 1, 2, 3, 4}, (byte[]) row.getField(7));
+		assertEquals("hi", row.getField(8));
+		// check nested field
+		assertTrue(row.getField(9) instanceof Row);
+		Row nested1 = (Row) row.getField(9);
+		assertEquals(1, nested1.getArity());
+		assertTrue(nested1.getField(0) instanceof Object[]);
+		Object[] nestedList1 = (Object[]) nested1.getField(0);
+		assertEquals(2, nestedList1.length);
+		assertEquals(Row.of(1, "bye"), nestedList1[0]);
+		assertEquals(Row.of(2, "sigh"), nestedList1[1]);
+		// check list
+		assertTrue(row.getField(10) instanceof Object[]);
+		Object[] list1 = (Object[]) row.getField(10);
+		assertEquals(2, list1.length);
+		assertEquals(Row.of(3, "good"), list1[0]);
+		assertEquals(Row.of(4, "bad"), list1[1]);
+		// check map
+		assertTrue(row.getField(11) instanceof HashMap);
+		HashMap map1 = (HashMap) row.getField(11);
+		assertEquals(0, map1.size());
+
+		// read second row
+		assertFalse(rowOrcInputFormat.reachedEnd());
+		row = rowOrcInputFormat.nextRecord(null);
+
+		// validate second row
+		assertNotNull(row);
+		assertEquals(12, row.getArity());
+		assertEquals(true, row.getField(0));
+		assertEquals((byte) 100, row.getField(1));
+		assertEquals((short) 2048, row.getField(2));
+		assertEquals(65536, row.getField(3));
+		assertEquals(9223372036854775807L, row.getField(4));
+		assertEquals(2.0f, row.getField(5));
+		assertEquals(-5.0d, row.getField(6));
+		assertArrayEquals(new byte[]{}, (byte[]) row.getField(7));
+		assertEquals("bye", row.getField(8));
+		// check nested field
+		assertTrue(row.getField(9) instanceof Row);
+		Row nested2 = (Row) row.getField(9);
+		assertEquals(1, nested2.getArity());
+		assertTrue(nested2.getField(0) instanceof Object[]);
+		Object[] nestedList2 = (Object[]) nested2.getField(0);
+		assertEquals(2, nestedList2.length);
+		assertEquals(Row.of(1, "bye"), nestedList2[0]);
+		assertEquals(Row.of(2, "sigh"), nestedList2[1]);
+		// check list
+		assertTrue(row.getField(10) instanceof Object[]);
+		Object[] list2 = (Object[]) row.getField(10);
+		assertEquals(3, list2.length);
+		assertEquals(Row.of(100000000, "cat"), list2[0]);
+		assertEquals(Row.of(-100000, "in"), list2[1]);
+		assertEquals(Row.of(1234, "hat"), list2[2]);
+		// check map
+		assertTrue(row.getField(11) instanceof HashMap);
+		HashMap map = (HashMap) row.getField(11);
+		assertEquals(2, map.size());
+		assertEquals(Row.of(5, "chani"), map.get("chani"));
+		assertEquals(Row.of(1, "mauddib"), map.get("mauddib"));
+
+		assertTrue(rowOrcInputFormat.reachedEnd());
+	}
+
+	@Test
+	public void testReadTimeTypeFile() throws IOException{
+		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration());
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+		Row row = rowOrcInputFormat.nextRecord(null);
+
+		// validate first row
+		assertNotNull(row);
+		assertEquals(2, row.getArity());
+		assertEquals(Timestamp.valueOf("1900-05-05 12:34:56.1"), row.getField(0));
+		assertEquals(Date.valueOf("1900-12-25"), row.getField(1));
+
+		// check correct number of rows
+		long cnt = 1;
+		while (!rowOrcInputFormat.reachedEnd()) {
+			assertNotNull(rowOrcInputFormat.nextRecord(null));
+			cnt++;
+		}
+		assertEquals(70000, cnt);
+	}
+
+	@Test
+	public void testReadDecimalTypeFile() throws IOException{
+		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration());
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+		Row row = rowOrcInputFormat.nextRecord(null);
+
+		// validate first row
+		assertNotNull(row);
+		assertEquals(1, row.getArity());
+		assertEquals(BigDecimal.valueOf(-1000.5d), row.getField(0));
+
+		// check correct number of rows
+		long cnt = 1;
+		while (!rowOrcInputFormat.reachedEnd()) {
+			assertNotNull(rowOrcInputFormat.nextRecord(null));
+			cnt++;
+		}
+		assertEquals(6000, cnt);
+	}
+
+	@Test
+	public void testReadNestedListFile() throws Exception {
+		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTEDLIST), TEST_SCHEMA_NESTEDLIST, new Configuration());
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+
+		Row row = null;
+		long cnt = 0;
+
+		// read all rows
+		while (!rowOrcInputFormat.reachedEnd()) {
+
+			row = rowOrcInputFormat.nextRecord(row);
+			assertEquals(1, row.getArity());
+
+			// outer list
+			Object[] list = (Object[]) row.getField(0);
+			assertEquals(1, list.length);
+
+			// nested list of rows
+			Row[] nestedRows = (Row[]) list[0];
+			assertEquals(1, nestedRows.length);
+			assertEquals(1, nestedRows[0].getArity());
+
+			// verify list value
+			assertEquals(cnt, nestedRows[0].getField(0));
+			cnt++;
+		}
+		// number of rows in file
+		assertEquals(100, cnt);
+	}
+
+	@Test
+	public void testReadWithProjection() throws IOException{
+		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+		rowOrcInputFormat.selectFields(7, 0, 10, 8);
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+		Row row = rowOrcInputFormat.nextRecord(null);
+
+		// validate first row
+		assertNotNull(row);
+		assertEquals(4, row.getArity());
+		// check binary
+		assertArrayEquals(new byte[]{0, 1, 2, 3, 4}, (byte[]) row.getField(0));
+		// check boolean
+		assertEquals(false, row.getField(1));
+		// check list
+		assertTrue(row.getField(2) instanceof Object[]);
+		Object[] list1 = (Object[]) row.getField(2);
+		assertEquals(2, list1.length);
+		assertEquals(Row.of(3, "good"), list1[0]);
+		assertEquals(Row.of(4, "bad"), list1[1]);
+		// check string
+		assertEquals("hi", row.getField(3));
+
+		// check that there is a second row with four fields
+		assertFalse(rowOrcInputFormat.reachedEnd());
+		row = rowOrcInputFormat.nextRecord(null);
+		assertNotNull(row);
+		assertEquals(4, row.getArity());
+		assertTrue(rowOrcInputFormat.reachedEnd());
+	}
+
+	@Test
+	public void testReadFileInSplits() throws IOException{
+
+		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+		rowOrcInputFormat.selectFields(0, 1);
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(4);
+		assertEquals(4, splits.length);
+		rowOrcInputFormat.openInputFormat();
+
+		long cnt = 0;
+		// read all splits
+		for (FileInputSplit split : splits) {
+
+			// open split
+			rowOrcInputFormat.open(split);
+			// read and count all rows
+			while (!rowOrcInputFormat.reachedEnd()) {
+				assertNotNull(rowOrcInputFormat.nextRecord(null));
+				cnt++;
+			}
+		}
+		// check that all rows have been read
+		assertEquals(1920800, cnt);
+	}
+
+	@Test
+	public void testReadFileWithFilter() throws IOException{
+
+		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+		rowOrcInputFormat.selectFields(0, 1);
+
+		// read head and tail of file
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Or(
+				new OrcRowInputFormat.LessThan("_col0", PredicateLeaf.Type.LONG, 10L),
+				new OrcRowInputFormat.Not(
+					new OrcRowInputFormat.LessThanEquals("_col0", PredicateLeaf.Type.LONG, 1920000L))
+			));
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.Equals("_col1", PredicateLeaf.Type.STRING, "M"));
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+
+		// open split
+		rowOrcInputFormat.open(splits[0]);
+
+		// read and count all rows
+		long cnt = 0;
+		while (!rowOrcInputFormat.reachedEnd()) {
+			assertNotNull(rowOrcInputFormat.nextRecord(null));
+			cnt++;
+		}
+		// check that only the first and last stripes of the file have been read.
+		// Each stripe has 5000 rows, except the last which has 800 rows.
+		assertEquals(5800, cnt);
+	}
+
+	@Test
+	public void testReadFileWithEvolvedSchema() throws IOException{
+
+		rowOrcInputFormat = new OrcRowInputFormat(
+			getPath(TEST_FILE_FLAT),
+			"struct<_col0:int,_col1:string,_col4:string,_col3:string>", // previous version of schema
+			new Configuration());
+		rowOrcInputFormat.selectFields(3, 0, 2);
+
+		rowOrcInputFormat.addPredicate(
+			new OrcRowInputFormat.LessThan("_col0", PredicateLeaf.Type.LONG, 10L));
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+
+		// open split
+		rowOrcInputFormat.open(splits[0]);
+
+		// read and validate first row
+		assertFalse(rowOrcInputFormat.reachedEnd());
+		Row row = rowOrcInputFormat.nextRecord(null);
+		assertNotNull(row);
+		assertEquals(3, row.getArity());
+		assertEquals("Primary", row.getField(0));
+		assertEquals(1, row.getField(1));
+		assertEquals("M", row.getField(2));
+
+		// read and count remaining rows
+		long cnt = 1;
+		while (!rowOrcInputFormat.reachedEnd()) {
+			assertNotNull(rowOrcInputFormat.nextRecord(null));
+			cnt++;
+		}
+		// check that only the first and last stripes of the file have been read.
+		// Each stripe has 5000 rows, except the last which has 800 rows.
+		assertEquals(5000, cnt);
+	}
+
+	private String getPath(String fileName) {
+		return getClass().getClassLoader().getResource(fileName).getPath();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
index 3de6ab3..e6ef1e1 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
@@ -18,125 +18,101 @@
 
 package org.apache.flink.orc;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.Row;
 
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.net.URL;
-import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests for {@link OrcTableSource}.
  */
 public class OrcTableSourceITCase extends MultipleProgramsTestBase {
 
-	private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
-		"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
-		"middle:struct<list:array<struct<int1:int,string1:string>>>," +
-		"list:array<struct<int1:int,string1:string>>," +
-		"map:map<string,struct<int1:int,string1:string>>>";
-
-	private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
-
-
-	private static final String[] TEST1_DATA = new String[] {
-		"false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}",
-		"true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," +
-			"[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, mauddib=1,mauddib}" };
+	private static final String TEST_FILE_FLAT = "test-data-flat.orc";
+	private static final String TEST_SCHEMA_FLAT =
+		"struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>";
 
 	public OrcTableSourceITCase() {
 		super(TestExecutionMode.COLLECTION);
 	}
 
 	@Test
-	public void testOrcTableSource() throws Exception {
+	public void testFullScan() throws Exception {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
 
-		assert (test1URL != null);
-		OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
-
-		tEnv.registerTableSource("orcTable", orc);
-
-		String query = "Select * from orcTable";
-		Table t = tEnv.sql(query);
+		OrcTableSource orc = OrcTableSource.builder()
+			.path(getPath(TEST_FILE_FLAT))
+			.forOrcSchema(TEST_SCHEMA_FLAT)
+			.build();
+		tEnv.registerTableSource("OrcTable", orc);
+
+		String query =
+			"SELECT COUNT(*), " +
+				"MIN(_col0), MAX(_col0), " +
+				"MIN(_col1), MAX(_col1), " +
+				"MIN(_col2), MAX(_col2), " +
+				"MIN(_col3), MAX(_col3), " +
+				"MIN(_col4), MAX(_col4), " +
+				"MIN(_col5), MAX(_col5), " +
+				"MIN(_col6), MAX(_col6), " +
+				"MIN(_col7), MAX(_col7), " +
+				"MIN(_col8), MAX(_col8) " +
+			"FROM OrcTable";
+		Table t = tEnv.sqlQuery(query);
 
 		DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class);
-		List<Row> records = dataSet.collect();
-
-		Assert.assertEquals(records.size(), 2);
+		List<Row> result = dataSet.collect();
 
-		List<String> actualRecords = new ArrayList<>();
-		for (Row record : records) {
-			Assert.assertEquals(record.getArity(), 12);
-			actualRecords.add(record.toString());
-		}
-
-		Assert.assertThat(actualRecords, CoreMatchers.hasItems(TEST1_DATA));
+		assertEquals(1, result.size());
+		assertEquals(
+			"1920800,1,1920800,F,M,D,W,2 yr Degree,Unknown,500,10000,Good,Unknown,0,6,0,6,0,6",
+			result.get(0).toString());
 	}
 
 	@Test
-	public void testOrcTableProjection() throws Exception {
+	public void testScanWithProjectionAndFilter() throws Exception {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
 
-		assert(test1URL != null);
-		OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
-
-		tEnv.registerTableSource("orcTable", orc);
-
-		String query = "Select middle,list,map from orcTable";
-		Table t = tEnv.sql(query);
-
-		String[] colNames = new String[] {"middle", "list", "map"};
-
-		RowTypeInfo rowTypeInfo = new RowTypeInfo(
-			new TypeInformation[] {
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO},
-			new String[] {"int1", "string1"});
-
-		RowTypeInfo structTypeInfo = new RowTypeInfo(
-			new TypeInformation[] {ObjectArrayTypeInfo.getInfoFor(rowTypeInfo)},
-			new String[] {"list"});
-
-		TypeInformation[] colTypes = new TypeInformation[] {
-			structTypeInfo,
-			ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
-			new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, rowTypeInfo)
-		};
-
-		TableSchema actualTableSchema = new TableSchema(colNames, colTypes);
-
-		Assert.assertArrayEquals(t.getSchema().getColumnNames(), colNames);
-		Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes);
-		Assert.assertEquals(actualTableSchema.toString(), t.getSchema().toString());
+		OrcTableSource orc = OrcTableSource.builder()
+			.path(getPath(TEST_FILE_FLAT))
+			.forOrcSchema(TEST_SCHEMA_FLAT)
+			.build();
+		tEnv.registerTableSource("OrcTable", orc);
+
+		String query =
+			"SELECT " +
+				"MIN(_col4), MAX(_col4), " +
+				"MIN(_col3), MAX(_col3), " +
+				"MIN(_col0), MAX(_col0), " +
+				"MIN(_col2), MAX(_col2), " +
+				"COUNT(*) " +
+				"FROM OrcTable " +
+				"WHERE (_col0 BETWEEN 4975 and 5024 OR _col0 BETWEEN 9975 AND 10024) AND _col1 = 'F'";
+		Table t = tEnv.sqlQuery(query);
 
 		DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class);
-		List<Row> records = dataSet.collect();
-
-		Assert.assertEquals(records.size(), 2);
-		for (Row record: records) {
-			Assert.assertEquals(record.getArity(), 3);
-		}
+		List<Row> result = dataSet.collect();
 
+		assertEquals(1, result.size());
+		assertEquals(
+			"1500,6000,2 yr Degree,Unknown,4976,10024,D,W,50",
+			result.get(0).toString());
 	}
 
+	private String getPath(String fileName) {
+		return getClass().getClassLoader().getResource(fileName).getPath();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
index c285054..4e4be77 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
@@ -18,96 +18,248 @@
 
 package org.apache.flink.orc;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GetCompositeField;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.ItemAt;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.types.Row;
 
-import org.junit.Assert;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
-import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Unit Tests for {@link OrcTableSource}.
  */
 public class OrcTableSourceTest {
 
-	private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
-		"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
-		"middle:struct<list:array<struct<int1:int,string1:string>>>," +
-		"list:array<struct<int1:int,string1:string>>," +
-		"map:map<string,struct<int1:int,string1:string>>>";
+	private static final String TEST_FILE_NESTED = "test-data-nested.orc";
+	private static final String TEST_SCHEMA_NESTED =
+		"struct<" +
+			"boolean1:boolean," +
+			"byte1:tinyint," +
+			"short1:smallint," +
+			"int1:int," +
+			"long1:bigint," +
+			"float1:float," +
+			"double1:double," +
+			"bytes1:binary," +
+			"string1:string," +
+			"middle:struct<" +
+				"list:array<" +
+					"struct<" +
+						"int1:int," +
+						"string1:string" +
+					">" +
+				">" +
+			">," +
+			"list:array<" +
+				"struct<" +
+					"int1:int," +
+					"string1:string" +
+				">" +
+			">," +
+			"map:map<" +
+				"string," +
+				"struct<" +
+					"int1:int," +
+					"string1:string" +
+				">" +
+			">" +
+		">";
+
+	@Test
+	public void testGetReturnType() throws Exception {
+
+		OrcTableSource orc = OrcTableSource.builder()
+			.path(getPath(TEST_FILE_NESTED))
+			.forOrcSchema(TEST_SCHEMA_NESTED)
+			.build();
 
-	private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
+		TypeInformation<Row> returnType = orc.getReturnType();
+		assertNotNull(returnType);
+		assertTrue(returnType instanceof RowTypeInfo);
+		RowTypeInfo rowType = (RowTypeInfo) returnType;
+
+		RowTypeInfo expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes());
+		assertEquals(expected, rowType);
+	}
 
 	@Test
-	public void testOrcSchema() throws Exception {
+	public void testGetTableSchema() throws Exception {
+
+		OrcTableSource orc = OrcTableSource.builder()
+			.path(getPath(TEST_FILE_NESTED))
+			.forOrcSchema(TEST_SCHEMA_NESTED)
+			.build();
+
+		TableSchema schema = orc.getTableSchema();
+		assertNotNull(schema);
+		assertArrayEquals(getNestedFieldNames(), schema.getColumnNames());
+		assertArrayEquals(getNestedFieldTypes(), schema.getTypes());
+	}
+
+	@Test
+	public void testProjectFields() throws Exception {
+
+		OrcTableSource orc = OrcTableSource.builder()
+			.path(getPath(TEST_FILE_NESTED))
+			.forOrcSchema(TEST_SCHEMA_NESTED)
+			.build();
+
+		OrcTableSource projected = (OrcTableSource) orc.projectFields(new int[]{3, 5, 1, 0});
 
-		assert(test1URL != null);
-		OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
+		// ensure copy is returned
+		assertTrue(orc != projected);
 
-		String expectedSchema = "Row(boolean1: Boolean, byte1: Byte, short1: Short, int1: Integer, long1: Long, " +
-			"float1: Float, double1: Double, bytes1: byte[], string1: String, " +
-			"middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>), " +
-			"list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>, " +
-			"map: Map<String, Row(int1: Integer, string1: String)>)";
+		// ensure table schema is identical
+		assertEquals(orc.getTableSchema(), projected.getTableSchema());
 
-		Assert.assertEquals(expectedSchema, orc.getReturnType().toString());
+		// ensure return type was adapted
+		String[] fieldNames = getNestedFieldNames();
+		TypeInformation[] fieldTypes = getNestedFieldTypes();
+		assertEquals(
+			Types.ROW_NAMED(
+				new String[] {fieldNames[3], fieldNames[5], fieldNames[1], fieldNames[0]},
+				new TypeInformation[] {fieldTypes[3], fieldTypes[5], fieldTypes[1], fieldTypes[0]}),
+			projected.getReturnType());
 
+		// ensure IF is configured with selected fields
+		OrcTableSource spyTS = spy(projected);
+		OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
+		doReturn(mockIF).when(spyTS).buildOrcInputFormat();
+		spyTS.getDataSet(mock(ExecutionEnvironment.class));
+		verify(mockIF).selectFields(eq(3), eq(5), eq(1), eq(0));
 	}
 
 	@Test
-	public void testOrcTableSchema() throws Exception {
+	public void testApplyPredicate() throws Exception {
 
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+		OrcTableSource orc = OrcTableSource.builder()
+			.path(getPath(TEST_FILE_NESTED))
+			.forOrcSchema(TEST_SCHEMA_NESTED)
+			.build();
 
-		assert(test1URL != null);
-		OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
+		// expressions for predicates
+		Expression pred1 = new GreaterThan(
+			new ResolvedFieldReference("int1", Types.INT),
+			new Literal(100, Types.INT));
+		Expression pred2 = new EqualTo(
+			new ResolvedFieldReference("string1", Types.STRING),
+			new Literal("hello", Types.STRING));
+		Expression pred3 = new EqualTo(
+			new GetCompositeField(
+				new ItemAt(
+					new ResolvedFieldReference(
+						"list",
+						ObjectArrayTypeInfo.getInfoFor(
+							Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))),
+					new Literal(1, Types.INT)),
+				"int1"),
+			new Literal(1, Types.INT)
+			);
+		ArrayList<Expression> preds = new ArrayList<>();
+		preds.add(pred1);
+		preds.add(pred2);
+		preds.add(pred3);
 
-		tEnv.registerTableSource("orcTable", orc);
-		String query = "Select * from orcTable";
-		Table t = tEnv.sql(query);
+		// apply predicates on TableSource
+		OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds);
 
-		String[] colNames = new String[] {
-			"boolean1", "byte1", "short1", "int1", "long1", "float1",
-			"double1", "bytes1", "string1", "list", "list0", "map"
-		};
+		// ensure copy is returned
+		assertTrue(orc != projected);
 
-		RowTypeInfo rowTypeInfo = new RowTypeInfo(
-			new TypeInformation[] {
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO},
-			new String[] {"int1", "string1"});
-
-		TypeInformation[] colTypes = new TypeInformation[] {
-			BasicTypeInfo.BOOLEAN_TYPE_INFO,
-			BasicTypeInfo.BYTE_TYPE_INFO,
-			BasicTypeInfo.SHORT_TYPE_INFO,
-			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.LONG_TYPE_INFO,
-			BasicTypeInfo.FLOAT_TYPE_INFO,
-			BasicTypeInfo.DOUBLE_TYPE_INFO,
-			PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO,
-			ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
-			ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
-			new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, rowTypeInfo)
-		};
-		TableSchema expectedTableSchema = new TableSchema(colNames, colTypes);
+		// ensure table schema is identical
+		assertEquals(orc.getTableSchema(), projected.getTableSchema());
+
+		// ensure return type is identical
+		assertEquals(
+			Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()),
+			projected.getReturnType());
+
+		// ensure IF is configured with supported predicates
+		OrcTableSource spyTS = spy(projected);
+		OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
+		doReturn(mockIF).when(spyTS).buildOrcInputFormat();
+		spyTS.getDataSet(mock(ExecutionEnvironment.class));
+
+		ArgumentCaptor<OrcRowInputFormat.Predicate> arguments = ArgumentCaptor.forClass(OrcRowInputFormat.Predicate.class);
+		verify(mockIF, times(2)).addPredicate(arguments.capture());
+		List<String> values = arguments.getAllValues().stream().map(Object::toString).collect(Collectors.toList());
+		assertTrue(values.contains(
+			new OrcRowInputFormat.Not(new OrcRowInputFormat.LessThanEquals("int1", PredicateLeaf.Type.LONG, 100)).toString()));
+		assertTrue(values.contains(
+			new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello").toString()));
+
+		// ensure filter pushdown is correct
+		assertTrue(spyTS.isFilterPushedDown());
+		assertFalse(orc.isFilterPushedDown());
+	}
+
+	private String getPath(String fileName) {
+		return getClass().getClassLoader().getResource(fileName).getPath();
+	}
 
-		Assert.assertArrayEquals(t.getSchema().getColumnNames(), colNames);
-		Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes);
-		Assert.assertEquals(expectedTableSchema.toString(), t.getSchema().toString());
+	private String[] getNestedFieldNames() {
+		return new String[] {
+			"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", "bytes1", "string1", "middle", "list", "map"
+		};
+	}
 
+	private TypeInformation[] getNestedFieldTypes() {
+		return new TypeInformation[]{
+			Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+			PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
+			Types.ROW_NAMED(
+				new String[]{"list"},
+				ObjectArrayTypeInfo.getInfoFor(
+					Types.ROW_NAMED(
+						new String[]{"int1", "string1"},
+						Types.INT, Types.STRING
+					)
+				)
+			),
+			ObjectArrayTypeInfo.getInfoFor(
+				Types.ROW_NAMED(
+					new String[]{"int1", "string1"},
+					Types.INT, Types.STRING
+				)
+			),
+			new MapTypeInfo<>(
+				Types.STRING,
+				Types.ROW_NAMED(
+					new String[]{"int1", "string1"},
+					Types.INT, Types.STRING
+				)
+			)
+		};
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
new file mode 100644
index 0000000..2cb1715
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link OrcUtils}.
+ *
+ */
+public class OrcUtilsTest {
+
+	@Test
+	public void testFlatSchemaToTypeInfo1() {
+
+		String schema =
+			"struct<" +
+				"boolean1:boolean," +
+				"byte1:tinyint," +
+				"short1:smallint," +
+				"int1:int," +
+				"long1:bigint," +
+				"float1:float," +
+				"double1:double," +
+				"bytes1:binary," +
+				"string1:string," +
+				"date1:date," +
+				"timestamp1:timestamp," +
+				"decimal1:decimal(5,2)" +
+			">";
+		TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+		Assert.assertNotNull(typeInfo);
+		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+		// validate field types
+		Assert.assertArrayEquals(
+			new TypeInformation[]{
+				Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
+				Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO
+			},
+			rowTypeInfo.getFieldTypes());
+
+		// validate field names
+		Assert.assertArrayEquals(
+			new String[] {
+				"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1",
+				"bytes1", "string1", "date1", "timestamp1", "decimal1"
+			},
+			rowTypeInfo.getFieldNames());
+
+	}
+
+	@Test
+	public void testNestedSchemaToTypeInfo1() {
+
+		String schema =
+			"struct<" +
+				"middle:struct<" +
+					"list:array<" +
+						"struct<" +
+							"int1:int," +
+							"string1:string" +
+						">" +
+					">" +
+				">," +
+				"list:array<" +
+					"struct<" +
+						"int1:int," +
+						"string1:string" +
+					">" +
+				">," +
+				"map:map<" +
+					"string," +
+					"struct<" +
+						"int1:int," +
+						"string1:string" +
+					">" +
+				">" +
+			">";
+		TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+		Assert.assertNotNull(typeInfo);
+		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+		// validate field types
+		Assert.assertArrayEquals(
+			new TypeInformation[]{
+				Types.ROW_NAMED(
+					new String[]{"list"},
+					ObjectArrayTypeInfo.getInfoFor(
+						Types.ROW_NAMED(
+							new String[]{"int1", "string1"},
+							Types.INT, Types.STRING
+						)
+					)
+				),
+				ObjectArrayTypeInfo.getInfoFor(
+					Types.ROW_NAMED(
+						new String[]{"int1", "string1"},
+						Types.INT, Types.STRING
+					)
+				),
+				new MapTypeInfo<>(
+					Types.STRING,
+					Types.ROW_NAMED(
+						new String[]{"int1", "string1"},
+						Types.INT, Types.STRING
+					)
+				)
+			},
+			rowTypeInfo.getFieldTypes());
+
+		// validate field names
+		Assert.assertArrayEquals(
+			new String[] {"middle", "list", "map"},
+			rowTypeInfo.getFieldNames());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
deleted file mode 100644
index 60008a0..0000000
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Tests for the {@link RowOrcInputFormat}.
- */
-
-public class RowOrcInputFormatTest {
-
-	private RowOrcInputFormat rowOrcInputFormat;
-
-	@After
-	public void tearDown() throws IOException {
-		if (rowOrcInputFormat != null) {
-			rowOrcInputFormat.close();
-			rowOrcInputFormat.closeInputFormat();
-		}
-		rowOrcInputFormat = null;
-	}
-
-	private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
-
-	private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
-		"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
-		"middle:struct<list:array<struct<int1:int,string1:string>>>," +
-		"list:array<struct<int1:int,string1:string>>," +
-		"map:map<string,struct<int1:int,string1:string>>>";
-
-	private static final String[] TEST1_DATA = new String[] {
-		"false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}",
-		"true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," +
-			"[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, mauddib=1,mauddib}" };
-
-	private static final String[] TEST1_PROJECTED_DATA = new String[] {
-		"{},[3,good, 4,bad],[1,bye, 2,sigh],hi,[0, 1, 2, 3, 4],-15.0,1.0,9223372036854775807,65536,1024,1,false",
-		"{chani=5,chani, mauddib=1,mauddib},[100000000,cat, -100000,in, 1234,hat],[1,bye, 2,sigh],bye," +
-			"[],-5.0,2.0,9223372036854775807,65536,2048,100,true" };
-
-	private static final String TEST1_INVALID_SCHEMA = "struct<boolean1:int,byte1:tinyint,short1:smallint,int1:int," +
-		"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
-		"middle:struct<list:array<struct<int1:int,string1:string>>>," +
-		"list:array<struct<int1:int,string1:string>>," +
-		"map:map<string,struct<int1:int,string1:string>>>";
-
-	@Test(expected = FileNotFoundException.class)
-	public void testInvalidPath() throws IOException{
-
-		rowOrcInputFormat = new RowOrcInputFormat("TestOrcFile.test2.orc", TEST1_SCHEMA, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-		rowOrcInputFormat.open(inputSplits[0]);
-
-	}
-
-	@Test(expected = RuntimeException.class)
-	public void testInvalidSchema() throws IOException{
-
-		assert(test1URL != null);
-		rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_INVALID_SCHEMA, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-		rowOrcInputFormat.open(inputSplits[0]);
-
-	}
-
-	@Test(expected = IndexOutOfBoundsException.class)
-	public void testInvalidProjection() throws IOException{
-
-		assert(test1URL != null);
-		rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
-		int[] projectionMask = {14};
-		rowOrcInputFormat.setFieldMapping(projectionMask);
-	}
-
-	@Test
-	public void testMajorDataTypes() throws IOException{
-
-		// test for boolean,byte,short,int,long,float,double,bytes,string,struct,list,map
-		assert(test1URL != null);
-		rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		Row row = null;
-		int count = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				Assert.assertEquals(row.toString(), TEST1_DATA[count++]);
-			}
-		}
-	}
-
-	@Test
-	public void testProjection() throws IOException{
-
-		assert(test1URL != null);
-		rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
-		int[] projectionMask = {11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
-		rowOrcInputFormat.setFieldMapping(projectionMask);
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		Row row = null;
-		int count = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				Assert.assertEquals(row.toString(), TEST1_PROJECTED_DATA[count++]);
-			}
-		}
-
-	}
-
-	@Test
-	public void testTimeStampAndDate() throws IOException{
-
-		URL expectedDataURL = getClass().getClassLoader().getResource("TestOrcFile.testDate1900.dat");
-		assert(expectedDataURL != null);
-		List<String> expectedTimeStampAndDate = Files.readAllLines(Paths.get(expectedDataURL.getPath()));
-
-		URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.testDate1900.orc");
-		assert(testInputURL != null);
-		String path = testInputURL.getPath();
-		String schema = "struct<time:timestamp,date:date>";
-		rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		List<Object> actualTimeStampAndDate = new ArrayList<>();
-
-		Row row = null;
-		int count = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				count++;
-				if (count <= 10000) {
-					actualTimeStampAndDate.add(row.getField(0) + "," + row.getField(1));
-				}
-
-			}
-		}
-		Assert.assertEquals(count, 70000);
-		Assert.assertEquals(expectedTimeStampAndDate.size(), actualTimeStampAndDate.size());
-		Assert.assertEquals(expectedTimeStampAndDate.toString(), actualTimeStampAndDate.toString());
-
-	}
-
-	@Test
-	public void testDecimal() throws IOException{
-
-		URL expectedDataURL = getClass().getClassLoader().getResource("decimal.dat");
-		List<String> expectedDecimal = Files.readAllLines(Paths.get(expectedDataURL.getPath()));
-
-		URL testInputURL = getClass().getClassLoader().getResource("decimal.orc");
-		assert(testInputURL != null);
-		String path = testInputURL.getPath();
-		String schema = "struct<_col0:decimal(10,5)>";
-		rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		List<Object> actualDecimal = new ArrayList<>();
-
-		Row row = null;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				actualDecimal.add(row.getField(0));
-			}
-		}
-
-		Assert.assertEquals(expectedDecimal.size(), actualDecimal.size());
-		Assert.assertEquals(expectedDecimal.toString(), actualDecimal.toString());
-
-	}
-
-	@Test
-	public void testEmptyFile() throws IOException{
-
-		URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.emptyFile.orc");
-		assert(testInputURL != null);
-		String path = testInputURL.getPath();
-
-		rowOrcInputFormat = new RowOrcInputFormat(path, TEST1_SCHEMA, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		Row row = new Row(1);
-		int count = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				count++;
-			}
-		}
-
-		Assert.assertEquals(count, 0);
-	}
-
-	@Test
-	public void testLargeFile() throws IOException{
-
-		URL testInputURL = getClass().getClassLoader().getResource("demo-11-none.orc");
-		assert(testInputURL != null);
-		String path = testInputURL.getPath();
-		String schema = "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," +
-			"_col5:string,_col6:int,_col7:int,_col8:int>";
-
-		rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		Row row = new Row(1);
-		int count = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				count++;
-			}
-		}
-
-		Assert.assertEquals(count, 1920800);
-	}
-
-	@Test
-	public void testProducedType() throws IOException{
-
-		assert(test1URL != null);
-		rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		rowOrcInputFormat.open(inputSplits[0]);
-
-		TypeInformation<Row> type = rowOrcInputFormat.getProducedType();
-		Assert.assertEquals(type.toString(), "Row(boolean1: Boolean, byte1: Byte, short1: Short, int1: Integer," +
-			" long1: Long, float1: Float, double1: Double, bytes1: byte[], string1: String," +
-			" middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," +
-			" list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>," +
-			" map: Map<String, Row(int1: Integer, string1: String)>)");
-
-	}
-
-	@Test
-	public void testProducedTypeWithProjection() throws IOException{
-
-		assert(test1URL != null);
-		rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
-		int[] projectionMask = {9, 10, 11};
-		rowOrcInputFormat.setFieldMapping(projectionMask);
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		rowOrcInputFormat.open(inputSplits[0]);
-
-		TypeInformation<Row> type = rowOrcInputFormat.getProducedType();
-		Assert.assertEquals(type.toString(), "Row(middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," +
-			" list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>," +
-			" map: Map<String, Row(int1: Integer, string1: String)>)");
-
-	}
-
-	@Test
-	public void testLongList() throws Exception {
-
-		URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.listlong.orc");
-		assert(testInputURL != null);
-		String path = testInputURL.getPath();
-		String schema = "struct<mylist1:array<bigint>>";
-
-		rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		Row row = null;
-		long count = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				Assert.assertEquals(row.getArity(), 1);
-				Object object = row.getField(0);
-				long[] l = (long[]) object;
-
-				Assert.assertEquals(l.length, 2);
-				if (count < 50) {
-					Assert.assertArrayEquals(l, new long[]{count, count + 1});
-				}
-				else {
-					Assert.assertArrayEquals(l, new long[]{0L, 0L});
-				}
-				count = count + 2;
-			}
-		}
-		Assert.assertEquals(count, 100);
-	}
-
-	@Test
-	public void testStringList() throws Exception {
-
-		URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.liststring.orc");
-		assert(testInputURL != null);
-		String path = testInputURL.getPath();
-		String schema = "struct<mylist1:array<string>>";
-
-		rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		Row row = null;
-		long count = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				Assert.assertEquals(row.getArity(), 1);
-				Object object = row.getField(0);
-				String[] l = (String[]) object;
-
-				Assert.assertEquals(l.length, 2);
-				Assert.assertArrayEquals(l, new String[]{"hello" + count, "hello" + (count + 1) });
-				count = count + 2;
-			}
-		}
-		Assert.assertEquals(count, 200);
-	}
-
-	@Test
-	public void testListOfListOfStructOfLong() throws Exception {
-		URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.listliststructlong.orc");
-		assert(testInputURL != null);
-		String path = testInputURL.getPath();
-		String schema = "struct<mylist1:array<array<struct<mylong1:bigint>>>>";
-
-		rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-
-		rowOrcInputFormat.openInputFormat();
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
-		Assert.assertEquals(inputSplits.length, 1);
-
-		Row row = null;
-		long count = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			while (!rowOrcInputFormat.reachedEnd()) {
-
-				row = rowOrcInputFormat.nextRecord(row);
-				Assert.assertEquals(row.getArity(), 1);
-
-				Object[] objects = (Object[]) row.getField(0);
-				Assert.assertEquals(objects.length, 1);
-
-				Object[] objects1 = (Object[]) objects[0];
-				Assert.assertEquals(objects1.length, 1);
-
-				Row[] nestedRows = Arrays.copyOf(objects1, objects1.length, Row[].class);
-				Assert.assertEquals(nestedRows.length, 1);
-
-				Assert.assertEquals(nestedRows[0].getArity(), 1);
-
-				Assert.assertEquals(nestedRows[0].getField(0), count);
-
-				count++;
-			}
-		}
-		Assert.assertEquals(count, 100);
-	}
-
-	@Test
-	public void testSplit() throws IOException{
-
-		URL testInputURL = getClass().getClassLoader().getResource("demo-11-none.orc");
-		assert(testInputURL != null);
-		String path = testInputURL.getPath();
-		String schema = "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," +
-			"_col5:string,_col6:int,_col7:int,_col8:int>";
-
-		rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-		rowOrcInputFormat.openInputFormat();
-
-		FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(10);
-
-		Assert.assertEquals(inputSplits.length, 10);
-
-		Row row = null;
-		int countTotalRecords = 0;
-		for (FileInputSplit split : inputSplits) {
-			rowOrcInputFormat.open(split);
-			int countSplitRecords = 0;
-			while (!rowOrcInputFormat.reachedEnd()) {
-				row = rowOrcInputFormat.nextRecord(row);
-				countSplitRecords++;
-			}
-			Assert.assertNotEquals(countSplitRecords, 1920800);
-			countTotalRecords += countSplitRecords;
-		}
-
-		Assert.assertEquals(countTotalRecords, 1920800);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc
deleted file mode 100644
index ecdadcb..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc
deleted file mode 100644
index 0f3f9c8..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc
deleted file mode 100644
index 648ea18..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc
deleted file mode 100644
index 75a5f2a..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc
deleted file mode 100644
index 4fb0bef..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc and /dev/null differ