You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/22 22:10:44 UTC
[7/9] flink git commit: [FLINK-2170] [connectors] Add
OrcRowInputFormat and OrcTableSource.
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
deleted file mode 100644
index 0c9c549..0000000
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import static org.apache.flink.orc.OrcUtils.fillRows;
-
-/**
- * InputFormat to read ORC data.
- * For Optimization, reading is done in batch instead of a single row.
- */
-public class RowOrcInputFormat
- extends FileInputFormat<Row>
- implements ResultTypeQueryable<Row> {
-
- private static final Logger LOG = LoggerFactory.getLogger(RowOrcInputFormat.class);
- private static final int BATCH_SIZE = 1024;
-
- private org.apache.hadoop.conf.Configuration config;
- private TypeDescription schema;
- private int[] fieldMapping;
-
- private transient RowTypeInfo rowType;
- private transient RecordReader orcRowsReader;
- private transient VectorizedRowBatch rowBatch;
- private transient Row[] rows;
-
- private transient int rowInBatch;
-
- public RowOrcInputFormat(String path, String schemaString, Configuration orcConfig) {
- this(path, TypeDescription.fromString(schemaString), orcConfig);
- }
-
- public RowOrcInputFormat(String path, TypeDescription orcSchema, Configuration orcConfig) {
- super(new Path(path));
- this.unsplittable = false;
- this.schema = orcSchema;
- this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
- this.config = orcConfig;
-
- this.fieldMapping = new int[this.schema.getChildren().size()];
- for (int i = 0; i < fieldMapping.length; i++) {
- this.fieldMapping[i] = i;
- }
-
- }
-
- public void setFieldMapping(int[] fieldMapping) {
- this.fieldMapping = fieldMapping;
- // adapt result type
-
- TypeInformation[] fieldTypes = new TypeInformation[fieldMapping.length];
- String[] fieldNames = new String[fieldMapping.length];
- for (int i = 0; i < fieldMapping.length; i++) {
- fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]);
- fieldNames[i] = this.rowType.getFieldNames()[fieldMapping[i]];
- }
- this.rowType = new RowTypeInfo(fieldTypes, fieldNames);
- }
-
- private boolean[] computeProjectionMask() {
- boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
- for (int inIdx : fieldMapping) {
- TypeDescription fieldSchema = schema.getChildren().get(inIdx);
- for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
- projectionMask[i] = true;
- }
- }
- return projectionMask;
- }
-
- @Override
- public void openInputFormat() throws IOException {
- super.openInputFormat();
- this.rows = new Row[BATCH_SIZE];
- for (int i = 0; i < BATCH_SIZE; i++) {
- rows[i] = new Row(fieldMapping.length);
- }
- }
-
- @Override
- public void open(FileInputSplit fileSplit) throws IOException {
-
- this.currentSplit = fileSplit;
- Preconditions.checkArgument(this.splitStart == 0, "ORC files must be read from the start.");
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening ORC file " + fileSplit.getPath());
- }
-
- org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(fileSplit.getPath().getPath());
-
- Reader orcReader = OrcFile.createReader(hPath, OrcFile.readerOptions(config));
-
- Reader.Options options = orcReader.options()
- .range(fileSplit.getStart(), fileSplit.getLength())
- .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(config))
- .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(config))
- .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(config));
-
- options.include(computeProjectionMask());
-
- // check that schema of file is as expected
- if (!this.schema.equals(orcReader.getSchema())) {
-
- throw new RuntimeException("Invalid schema for file at " + this.filePath +
- " Expected:" + this.schema + " Actual: " + orcReader.getSchema());
- }
-
- this.orcRowsReader = orcReader.rows(options);
-
- // assign ids
- this.schema.getId();
-
- this.rowBatch = schema.createRowBatch(BATCH_SIZE);
- rowInBatch = 0;
- }
-
- @Override
- public void close() throws IOException {
-
- if (orcRowsReader != null) {
- this.orcRowsReader.close();
- }
- this.orcRowsReader = null;
-
- }
-
- @Override
- public void closeInputFormat() throws IOException {
- this.rows = null;
- this.rows = null;
- this.schema = null;
- this.rowBatch = null;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !ensureBatch();
- }
-
- private boolean ensureBatch() throws IOException {
-
- if (rowInBatch >= rowBatch.size) {
- rowInBatch = 0;
- boolean moreRows = orcRowsReader.nextBatch(rowBatch);
-
- if (moreRows) {
- // read rows
- fillRows(rows, schema, rowBatch, fieldMapping);
- }
- return moreRows;
- }
-
- return true;
- }
-
- @Override
- public Row nextRecord(Row reuse) throws IOException {
- return rows[this.rowInBatch++];
- }
-
- @Override
- public TypeInformation<Row> getProducedType() {
- return rowType;
- }
-
- // --------------------------------------------------------------------------------------------
- // Custom serialization methods
- // --------------------------------------------------------------------------------------------
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- this.config.write(out);
- out.writeUTF(schema.toString());
-
- out.writeInt(fieldMapping.length);
- for (int f : fieldMapping) {
- out.writeInt(f);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-
- org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
- configuration.readFields(in);
-
- if (this.config == null) {
- this.config = configuration;
- }
- this.schema = TypeDescription.fromString(in.readUTF());
-
- this.fieldMapping = new int[in.readInt()];
- for (int i = 0; i < fieldMapping.length; i++) {
- this.fieldMapping[i] = in.readInt();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
new file mode 100644
index 0000000..0efe41f
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
@@ -0,0 +1,795 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.Reader;
+import org.apache.orc.StripeInformation;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link OrcRowInputFormat}.
+ *
+ */
+public class OrcRowInputFormatTest {
+
+ private OrcRowInputFormat rowOrcInputFormat;
+
+ @After
+ public void tearDown() throws IOException {
+ if (rowOrcInputFormat != null) {
+ rowOrcInputFormat.close();
+ rowOrcInputFormat.closeInputFormat();
+ }
+ rowOrcInputFormat = null;
+ }
+
+ private static final String TEST_FILE_FLAT = "test-data-flat.orc";
+ private static final String TEST_SCHEMA_FLAT =
+ "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>";
+
+ private static final String TEST_FILE_NESTED = "test-data-nested.orc";
+ private static final String TEST_SCHEMA_NESTED =
+ "struct<" +
+ "boolean1:boolean," +
+ "byte1:tinyint," +
+ "short1:smallint," +
+ "int1:int," +
+ "long1:bigint," +
+ "float1:float," +
+ "double1:double," +
+ "bytes1:binary," +
+ "string1:string," +
+ "middle:struct<" +
+ "list:array<" +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">" +
+ ">," +
+ "list:array<" +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">," +
+ "map:map<" +
+ "string," +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">" +
+ ">";
+
+ private static final String TEST_FILE_TIMETYPES = "test-data-timetypes.orc";
+ private static final String TEST_SCHEMA_TIMETYPES = "struct<time:timestamp,date:date>";
+
+ private static final String TEST_FILE_DECIMAL = "test-data-decimal.orc";
+ private static final String TEST_SCHEMA_DECIMAL = "struct<_col0:decimal(10,5)>";
+
+ private static final String TEST_FILE_NESTEDLIST = "test-data-nestedlist.orc";
+ private static final String TEST_SCHEMA_NESTEDLIST = "struct<mylist1:array<array<struct<mylong1:bigint>>>>";
+
+ @Test(expected = FileNotFoundException.class)
+ public void testInvalidPath() throws IOException{
+ rowOrcInputFormat =
+ new OrcRowInputFormat("/does/not/exist", TEST_SCHEMA_FLAT, new Configuration());
+ rowOrcInputFormat.openInputFormat();
+ FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
+ rowOrcInputFormat.open(inputSplits[0]);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testInvalidProjection1() throws IOException{
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+ int[] projectionMask = {1, 2, 3, -1};
+ rowOrcInputFormat.selectFields(projectionMask);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testInvalidProjection2() throws IOException{
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+ int[] projectionMask = {1, 2, 3, 9};
+ rowOrcInputFormat.selectFields(projectionMask);
+ }
+
+ @Test
+ public void testProjectionMaskNested() throws IOException{
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+ OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+ // mock options to check configuration of ORC reader
+ Reader.Options options = new Reader.Options();
+ doReturn(options).when(spy).getOptions(any());
+
+ spy.selectFields(9, 11, 2);
+ spy.openInputFormat();
+ FileInputSplit[] splits = spy.createInputSplits(1);
+ spy.open(splits[0]);
+
+ // top-level struct is false
+ boolean[] expected = new boolean[]{
+ false, // top level
+ false, false, // flat fields 0, 1 are out
+ true, // flat field 2 is in
+ false, false, false, false, false, false, // flat fields 3, 4, 5, 6, 7, 8 are out
+ true, true, true, true, true, // nested field 9 is in
+ false, false, false, false, // nested field 10 is out
+ true, true, true, true, true}; // nested field 11 is in
+ assertArrayEquals(expected, options.getInclude());
+ }
+
+ @Test
+ public void testSplitStripesGivenSplits() throws IOException {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+
+ OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+ // mock options to check configuration of ORC reader
+ Reader.Options options = spy(new Reader.Options());
+ doReturn(options).when(spy).getOptions(any());
+
+ FileInputSplit[] splits = spy.createInputSplits(3);
+
+ spy.openInputFormat();
+ spy.open(splits[0]);
+ verify(options).range(eq(3L), eq(137005L));
+ spy.open(splits[1]);
+ verify(options).range(eq(137008L), eq(136182L));
+ spy.open(splits[2]);
+ verify(options).range(eq(273190L), eq(123633L));
+ }
+
+ @Test
+ public void testSplitStripesCustomSplits() throws IOException {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+
+ OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+ // mock list of stripes
+ List<StripeInformation> stripes = new ArrayList<>();
+ StripeInformation stripe1 = mock(StripeInformation.class);
+ when(stripe1.getOffset()).thenReturn(10L);
+ when(stripe1.getLength()).thenReturn(90L);
+ StripeInformation stripe2 = mock(StripeInformation.class);
+ when(stripe2.getOffset()).thenReturn(100L);
+ when(stripe2.getLength()).thenReturn(100L);
+ StripeInformation stripe3 = mock(StripeInformation.class);
+ when(stripe3.getOffset()).thenReturn(200L);
+ when(stripe3.getLength()).thenReturn(100L);
+ StripeInformation stripe4 = mock(StripeInformation.class);
+ when(stripe4.getOffset()).thenReturn(300L);
+ when(stripe4.getLength()).thenReturn(100L);
+ StripeInformation stripe5 = mock(StripeInformation.class);
+ when(stripe5.getOffset()).thenReturn(400L);
+ when(stripe5.getLength()).thenReturn(100L);
+ stripes.add(stripe1);
+ stripes.add(stripe2);
+ stripes.add(stripe3);
+ stripes.add(stripe4);
+ stripes.add(stripe5);
+ doReturn(stripes).when(spy).getStripes(any());
+
+ // mock options to check configuration of ORC reader
+ Reader.Options options = spy(new Reader.Options());
+ doReturn(options).when(spy).getOptions(any());
+
+ spy.openInputFormat();
+ // split ranging 2 stripes
+ spy.open(new FileInputSplit(0, new Path(getPath(TEST_FILE_FLAT)), 0, 150, new String[]{}));
+ verify(options).range(eq(10L), eq(190L));
+ // split ranging 0 stripes
+ spy.open(new FileInputSplit(1, new Path(getPath(TEST_FILE_FLAT)), 150, 10, new String[]{}));
+ verify(options).range(eq(0L), eq(0L));
+ // split ranging 1 stripe
+ spy.open(new FileInputSplit(2, new Path(getPath(TEST_FILE_FLAT)), 160, 41, new String[]{}));
+ verify(options).range(eq(200L), eq(100L));
+ // split ranging 2 stripe
+ spy.open(new FileInputSplit(3, new Path(getPath(TEST_FILE_FLAT)), 201, 299, new String[]{}));
+ verify(options).range(eq(300L), eq(200L));
+ }
+
+ @Test
+ public void testProducedType() throws IOException {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+ assertTrue(rowOrcInputFormat.getProducedType() instanceof RowTypeInfo);
+ RowTypeInfo producedType = (RowTypeInfo) rowOrcInputFormat.getProducedType();
+
+ assertArrayEquals(
+ new TypeInformation[]{
+ // primitives
+ Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+ // binary
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+ // string
+ Types.STRING,
+ // struct
+ Types.ROW_NAMED(
+ new String[]{"list"},
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))),
+ // list
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING)),
+ // map
+ new MapTypeInfo<>(Types.STRING, Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))
+ },
+ producedType.getFieldTypes());
+ assertArrayEquals(
+ new String[]{"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", "bytes1", "string1", "middle", "list", "map"},
+ producedType.getFieldNames());
+ }
+
+ @Test
+ public void testProducedTypeWithProjection() throws IOException {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+ rowOrcInputFormat.selectFields(9, 3, 7, 10);
+
+ assertTrue(rowOrcInputFormat.getProducedType() instanceof RowTypeInfo);
+ RowTypeInfo producedType = (RowTypeInfo) rowOrcInputFormat.getProducedType();
+
+ assertArrayEquals(
+ new TypeInformation[]{
+ // struct
+ Types.ROW_NAMED(
+ new String[]{"list"},
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))),
+ // int
+ Types.INT,
+ // binary
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+ // list
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))
+ },
+ producedType.getFieldTypes());
+ assertArrayEquals(
+ new String[]{"middle", "int1", "bytes1", "list"},
+ producedType.getFieldNames());
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+
+ rowOrcInputFormat.selectFields(0, 4, 1);
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Equals("_col1", PredicateLeaf.Type.STRING, "M"));
+
+ byte[] bytes = InstantiationUtil.serializeObject(rowOrcInputFormat);
+ OrcRowInputFormat copy = InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader());
+
+ FileInputSplit[] splits = copy.createInputSplits(1);
+ copy.openInputFormat();
+ copy.open(splits[0]);
+ assertFalse(copy.reachedEnd());
+ Row row = copy.nextRecord(null);
+
+ assertNotNull(row);
+ assertEquals(3, row.getArity());
+ // check first row
+ assertEquals(1, row.getField(0));
+ assertEquals(500, row.getField(1));
+ assertEquals("M", row.getField(2));
+ }
+
+ @Test
+ public void testNumericBooleanStringPredicates() throws Exception {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+ rowOrcInputFormat.selectFields(0, 1, 2, 3, 4, 5, 6, 8);
+
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Equals("boolean1", PredicateLeaf.Type.BOOLEAN, false));
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.LessThan("byte1", PredicateLeaf.Type.LONG, 1));
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.LessThanEquals("short1", PredicateLeaf.Type.LONG, 1024));
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Between("int1", PredicateLeaf.Type.LONG, -1, 65536));
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Equals("long1", PredicateLeaf.Type.LONG, 9223372036854775807L));
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Equals("float1", PredicateLeaf.Type.FLOAT, 1.0));
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Equals("double1", PredicateLeaf.Type.FLOAT, -15.0));
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.IsNull("string1", PredicateLeaf.Type.STRING));
+ // boolean pred
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello"));
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ rowOrcInputFormat.openInputFormat();
+
+ // mock options to check configuration of ORC reader
+ OrcRowInputFormat spy = spy(rowOrcInputFormat);
+ Reader.Options options = new Reader.Options();
+ doReturn(options).when(spy).getOptions(any());
+
+ spy.openInputFormat();
+ spy.open(splits[0]);
+
+ // verify predicate configuration
+ SearchArgument sarg = options.getSearchArgument();
+ assertNotNull(sarg);
+ assertEquals("(and leaf-0 leaf-1 leaf-2 leaf-3 leaf-4 leaf-5 leaf-6 leaf-7 leaf-8)", sarg.getExpression().toString());
+ assertEquals(9, sarg.getLeaves().size());
+ List<PredicateLeaf> leaves = sarg.getLeaves();
+ assertEquals("(EQUALS boolean1 false)", leaves.get(0).toString());
+ assertEquals("(LESS_THAN byte1 1)", leaves.get(1).toString());
+ assertEquals("(LESS_THAN_EQUALS short1 1024)", leaves.get(2).toString());
+ assertEquals("(BETWEEN int1 -1 65536)", leaves.get(3).toString());
+ assertEquals("(EQUALS long1 9223372036854775807)", leaves.get(4).toString());
+ assertEquals("(EQUALS float1 1.0)", leaves.get(5).toString());
+ assertEquals("(EQUALS double1 -15.0)", leaves.get(6).toString());
+ assertEquals("(IS_NULL string1)", leaves.get(7).toString());
+ assertEquals("(EQUALS string1 hello)", leaves.get(8).toString());
+ }
+
+ @Test
+ public void testTimePredicates() throws Exception {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration());
+
+ rowOrcInputFormat.addPredicate(
+ // OR
+ new OrcRowInputFormat.Or(
+ // timestamp pred
+ new OrcRowInputFormat.Equals("time", PredicateLeaf.Type.TIMESTAMP, Timestamp.valueOf("1900-05-05 12:34:56.100")),
+ // date pred
+ new OrcRowInputFormat.Equals("date", PredicateLeaf.Type.DATE, Date.valueOf("1900-12-25")))
+ );
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ rowOrcInputFormat.openInputFormat();
+
+ // mock options to check configuration of ORC reader
+ OrcRowInputFormat spy = spy(rowOrcInputFormat);
+ Reader.Options options = new Reader.Options();
+ doReturn(options).when(spy).getOptions(any());
+
+ spy.openInputFormat();
+ spy.open(splits[0]);
+
+ // verify predicate configuration
+ SearchArgument sarg = options.getSearchArgument();
+ assertNotNull(sarg);
+ assertEquals("(or leaf-0 leaf-1)", sarg.getExpression().toString());
+ assertEquals(2, sarg.getLeaves().size());
+ List<PredicateLeaf> leaves = sarg.getLeaves();
+ assertEquals("(EQUALS time 1900-05-05 12:34:56.1)", leaves.get(0).toString());
+ assertEquals("(EQUALS date 1900-12-25)", leaves.get(1).toString());
+ }
+
+ @Test
+ public void testDecimalPredicate() throws Exception {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration());
+
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Not(
+ // decimal pred
+ new OrcRowInputFormat.Equals("_col0", PredicateLeaf.Type.DECIMAL, BigDecimal.valueOf(-1000.5))));
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ rowOrcInputFormat.openInputFormat();
+
+ // mock options to check configuration of ORC reader
+ OrcRowInputFormat spy = spy(rowOrcInputFormat);
+ Reader.Options options = new Reader.Options();
+ doReturn(options).when(spy).getOptions(any());
+
+ spy.openInputFormat();
+ spy.open(splits[0]);
+
+ // verify predicate configuration
+ SearchArgument sarg = options.getSearchArgument();
+ assertNotNull(sarg);
+ assertEquals("(not leaf-0)", sarg.getExpression().toString());
+ assertEquals(1, sarg.getLeaves().size());
+ List<PredicateLeaf> leaves = sarg.getLeaves();
+ assertEquals("(EQUALS _col0 -1000.5)", leaves.get(0).toString());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPredicateWithInvalidColumn() throws Exception {
+ rowOrcInputFormat =
+ new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Equals("unknown", PredicateLeaf.Type.LONG, 42));
+ }
+
+ @Test
+ public void testReadNestedFile() throws IOException{
+ rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+ rowOrcInputFormat.open(splits[0]);
+
+ assertFalse(rowOrcInputFormat.reachedEnd());
+ Row row = rowOrcInputFormat.nextRecord(null);
+
+ // validate first row
+ assertNotNull(row);
+ assertEquals(12, row.getArity());
+ assertEquals(false, row.getField(0));
+ assertEquals((byte) 1, row.getField(1));
+ assertEquals((short) 1024, row.getField(2));
+ assertEquals(65536, row.getField(3));
+ assertEquals(9223372036854775807L, row.getField(4));
+ assertEquals(1.0f, row.getField(5));
+ assertEquals(-15.0d, row.getField(6));
+ assertArrayEquals(new byte[]{0, 1, 2, 3, 4}, (byte[]) row.getField(7));
+ assertEquals("hi", row.getField(8));
+ // check nested field
+ assertTrue(row.getField(9) instanceof Row);
+ Row nested1 = (Row) row.getField(9);
+ assertEquals(1, nested1.getArity());
+ assertTrue(nested1.getField(0) instanceof Object[]);
+ Object[] nestedList1 = (Object[]) nested1.getField(0);
+ assertEquals(2, nestedList1.length);
+ assertEquals(Row.of(1, "bye"), nestedList1[0]);
+ assertEquals(Row.of(2, "sigh"), nestedList1[1]);
+ // check list
+ assertTrue(row.getField(10) instanceof Object[]);
+ Object[] list1 = (Object[]) row.getField(10);
+ assertEquals(2, list1.length);
+ assertEquals(Row.of(3, "good"), list1[0]);
+ assertEquals(Row.of(4, "bad"), list1[1]);
+ // check map
+ assertTrue(row.getField(11) instanceof HashMap);
+ HashMap map1 = (HashMap) row.getField(11);
+ assertEquals(0, map1.size());
+
+ // read second row
+ assertFalse(rowOrcInputFormat.reachedEnd());
+ row = rowOrcInputFormat.nextRecord(null);
+
+ // validate second row
+ assertNotNull(row);
+ assertEquals(12, row.getArity());
+ assertEquals(true, row.getField(0));
+ assertEquals((byte) 100, row.getField(1));
+ assertEquals((short) 2048, row.getField(2));
+ assertEquals(65536, row.getField(3));
+ assertEquals(9223372036854775807L, row.getField(4));
+ assertEquals(2.0f, row.getField(5));
+ assertEquals(-5.0d, row.getField(6));
+ assertArrayEquals(new byte[]{}, (byte[]) row.getField(7));
+ assertEquals("bye", row.getField(8));
+ // check nested field
+ assertTrue(row.getField(9) instanceof Row);
+ Row nested2 = (Row) row.getField(9);
+ assertEquals(1, nested2.getArity());
+ assertTrue(nested2.getField(0) instanceof Object[]);
+ Object[] nestedList2 = (Object[]) nested2.getField(0);
+ assertEquals(2, nestedList2.length);
+ assertEquals(Row.of(1, "bye"), nestedList2[0]);
+ assertEquals(Row.of(2, "sigh"), nestedList2[1]);
+ // check list
+ assertTrue(row.getField(10) instanceof Object[]);
+ Object[] list2 = (Object[]) row.getField(10);
+ assertEquals(3, list2.length);
+ assertEquals(Row.of(100000000, "cat"), list2[0]);
+ assertEquals(Row.of(-100000, "in"), list2[1]);
+ assertEquals(Row.of(1234, "hat"), list2[2]);
+ // check map
+ assertTrue(row.getField(11) instanceof HashMap);
+ HashMap map = (HashMap) row.getField(11);
+ assertEquals(2, map.size());
+ assertEquals(Row.of(5, "chani"), map.get("chani"));
+ assertEquals(Row.of(1, "mauddib"), map.get("mauddib"));
+
+ assertTrue(rowOrcInputFormat.reachedEnd());
+ }
+
+ @Test
+ public void testReadTimeTypeFile() throws IOException{
+ rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration());
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+ rowOrcInputFormat.open(splits[0]);
+
+ assertFalse(rowOrcInputFormat.reachedEnd());
+ Row row = rowOrcInputFormat.nextRecord(null);
+
+ // validate first row
+ assertNotNull(row);
+ assertEquals(2, row.getArity());
+ assertEquals(Timestamp.valueOf("1900-05-05 12:34:56.1"), row.getField(0));
+ assertEquals(Date.valueOf("1900-12-25"), row.getField(1));
+
+ // check correct number of rows
+ long cnt = 1;
+ while (!rowOrcInputFormat.reachedEnd()) {
+ assertNotNull(rowOrcInputFormat.nextRecord(null));
+ cnt++;
+ }
+ assertEquals(70000, cnt);
+ }
+
+ @Test
+ public void testReadDecimalTypeFile() throws IOException{
+ rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration());
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+ rowOrcInputFormat.open(splits[0]);
+
+ assertFalse(rowOrcInputFormat.reachedEnd());
+ Row row = rowOrcInputFormat.nextRecord(null);
+
+ // validate first row
+ assertNotNull(row);
+ assertEquals(1, row.getArity());
+ assertEquals(BigDecimal.valueOf(-1000.5d), row.getField(0));
+
+ // check correct number of rows
+ long cnt = 1;
+ while (!rowOrcInputFormat.reachedEnd()) {
+ assertNotNull(rowOrcInputFormat.nextRecord(null));
+ cnt++;
+ }
+ assertEquals(6000, cnt);
+ }
+
+ @Test
+ public void testReadNestedListFile() throws Exception {
+ rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTEDLIST), TEST_SCHEMA_NESTEDLIST, new Configuration());
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+ rowOrcInputFormat.open(splits[0]);
+
+ assertFalse(rowOrcInputFormat.reachedEnd());
+
+ Row row = null;
+ long cnt = 0;
+
+ // read all rows
+ while (!rowOrcInputFormat.reachedEnd()) {
+
+ row = rowOrcInputFormat.nextRecord(row);
+ assertEquals(1, row.getArity());
+
+ // outer list
+ Object[] list = (Object[]) row.getField(0);
+ assertEquals(1, list.length);
+
+ // nested list of rows
+ Row[] nestedRows = (Row[]) list[0];
+ assertEquals(1, nestedRows.length);
+ assertEquals(1, nestedRows[0].getArity());
+
+ // verify list value
+ assertEquals(cnt, nestedRows[0].getField(0));
+ cnt++;
+ }
+ // number of rows in file
+ assertEquals(100, cnt);
+ }
+
+ @Test
+ public void testReadWithProjection() throws IOException{
+ rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
+
+ rowOrcInputFormat.selectFields(7, 0, 10, 8);
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+ rowOrcInputFormat.open(splits[0]);
+
+ assertFalse(rowOrcInputFormat.reachedEnd());
+ Row row = rowOrcInputFormat.nextRecord(null);
+
+ // validate first row
+ assertNotNull(row);
+ assertEquals(4, row.getArity());
+ // check binary
+ assertArrayEquals(new byte[]{0, 1, 2, 3, 4}, (byte[]) row.getField(0));
+ // check boolean
+ assertEquals(false, row.getField(1));
+ // check list
+ assertTrue(row.getField(2) instanceof Object[]);
+ Object[] list1 = (Object[]) row.getField(2);
+ assertEquals(2, list1.length);
+ assertEquals(Row.of(3, "good"), list1[0]);
+ assertEquals(Row.of(4, "bad"), list1[1]);
+ // check string
+ assertEquals("hi", row.getField(3));
+
+ // check that there is a second row with four fields
+ assertFalse(rowOrcInputFormat.reachedEnd());
+ row = rowOrcInputFormat.nextRecord(null);
+ assertNotNull(row);
+ assertEquals(4, row.getArity());
+ assertTrue(rowOrcInputFormat.reachedEnd());
+ }
+
+ @Test
+ public void testReadFileInSplits() throws IOException{
+
+ rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+ rowOrcInputFormat.selectFields(0, 1);
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(4);
+ assertEquals(4, splits.length);
+ rowOrcInputFormat.openInputFormat();
+
+ long cnt = 0;
+ // read all splits
+ for (FileInputSplit split : splits) {
+
+ // open split
+ rowOrcInputFormat.open(split);
+ // read and count all rows
+ while (!rowOrcInputFormat.reachedEnd()) {
+ assertNotNull(rowOrcInputFormat.nextRecord(null));
+ cnt++;
+ }
+ }
+ // check that all rows have been read
+ assertEquals(1920800, cnt);
+ }
+
+ @Test
+ public void testReadFileWithFilter() throws IOException{
+
+ rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
+ rowOrcInputFormat.selectFields(0, 1);
+
+ // read head and tail of file
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Or(
+ new OrcRowInputFormat.LessThan("_col0", PredicateLeaf.Type.LONG, 10L),
+ new OrcRowInputFormat.Not(
+ new OrcRowInputFormat.LessThanEquals("_col0", PredicateLeaf.Type.LONG, 1920000L))
+ ));
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.Equals("_col1", PredicateLeaf.Type.STRING, "M"));
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+
+ // open split
+ rowOrcInputFormat.open(splits[0]);
+
+ // read and count all rows
+ long cnt = 0;
+ while (!rowOrcInputFormat.reachedEnd()) {
+ assertNotNull(rowOrcInputFormat.nextRecord(null));
+ cnt++;
+ }
+ // check that only the first and last stripes of the file have been read.
+ // Each stripe has 5000 rows, except the last which has 800 rows.
+ assertEquals(5800, cnt);
+ }
+
+ @Test
+ public void testReadFileWithEvolvedSchema() throws IOException{
+
+ rowOrcInputFormat = new OrcRowInputFormat(
+ getPath(TEST_FILE_FLAT),
+ "struct<_col0:int,_col1:string,_col4:string,_col3:string>", // previous version of schema
+ new Configuration());
+ rowOrcInputFormat.selectFields(3, 0, 2);
+
+ rowOrcInputFormat.addPredicate(
+ new OrcRowInputFormat.LessThan("_col0", PredicateLeaf.Type.LONG, 10L));
+
+ FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+ assertEquals(1, splits.length);
+ rowOrcInputFormat.openInputFormat();
+
+ // open split
+ rowOrcInputFormat.open(splits[0]);
+
+ // read and validate first row
+ assertFalse(rowOrcInputFormat.reachedEnd());
+ Row row = rowOrcInputFormat.nextRecord(null);
+ assertNotNull(row);
+ assertEquals(3, row.getArity());
+ assertEquals("Primary", row.getField(0));
+ assertEquals(1, row.getField(1));
+ assertEquals("M", row.getField(2));
+
+ // read and count remaining rows
+ long cnt = 1;
+ while (!rowOrcInputFormat.reachedEnd()) {
+ assertNotNull(rowOrcInputFormat.nextRecord(null));
+ cnt++;
+ }
+ // check that only the first and last stripes of the file have been read.
+ // Each stripe has 5000 rows, except the last which has 800 rows.
+ assertEquals(5000, cnt);
+ }
+
+ private String getPath(String fileName) {
+ return getClass().getClassLoader().getResource(fileName).getPath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
index 3de6ab3..e6ef1e1 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
@@ -18,125 +18,101 @@
package org.apache.flink.orc;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Row;
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
import org.junit.Test;
-import java.net.URL;
-import java.util.ArrayList;
import java.util.List;
+import static org.junit.Assert.assertEquals;
+
/**
* Tests for {@link OrcTableSource}.
*/
public class OrcTableSourceITCase extends MultipleProgramsTestBase {
- private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
- "long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
- "middle:struct<list:array<struct<int1:int,string1:string>>>," +
- "list:array<struct<int1:int,string1:string>>," +
- "map:map<string,struct<int1:int,string1:string>>>";
-
- private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
-
-
- private static final String[] TEST1_DATA = new String[] {
- "false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}",
- "true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," +
- "[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, mauddib=1,mauddib}" };
+ private static final String TEST_FILE_FLAT = "test-data-flat.orc";
+ private static final String TEST_SCHEMA_FLAT =
+ "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>";
public OrcTableSourceITCase() {
super(TestExecutionMode.COLLECTION);
}
@Test
- public void testOrcTableSource() throws Exception {
+ public void testFullScan() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
- assert (test1URL != null);
- OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
-
- tEnv.registerTableSource("orcTable", orc);
-
- String query = "Select * from orcTable";
- Table t = tEnv.sql(query);
+ OrcTableSource orc = OrcTableSource.builder()
+ .path(getPath(TEST_FILE_FLAT))
+ .forOrcSchema(TEST_SCHEMA_FLAT)
+ .build();
+ tEnv.registerTableSource("OrcTable", orc);
+
+ String query =
+ "SELECT COUNT(*), " +
+ "MIN(_col0), MAX(_col0), " +
+ "MIN(_col1), MAX(_col1), " +
+ "MIN(_col2), MAX(_col2), " +
+ "MIN(_col3), MAX(_col3), " +
+ "MIN(_col4), MAX(_col4), " +
+ "MIN(_col5), MAX(_col5), " +
+ "MIN(_col6), MAX(_col6), " +
+ "MIN(_col7), MAX(_col7), " +
+ "MIN(_col8), MAX(_col8) " +
+ "FROM OrcTable";
+ Table t = tEnv.sqlQuery(query);
DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class);
- List<Row> records = dataSet.collect();
-
- Assert.assertEquals(records.size(), 2);
+ List<Row> result = dataSet.collect();
- List<String> actualRecords = new ArrayList<>();
- for (Row record : records) {
- Assert.assertEquals(record.getArity(), 12);
- actualRecords.add(record.toString());
- }
-
- Assert.assertThat(actualRecords, CoreMatchers.hasItems(TEST1_DATA));
+ assertEquals(1, result.size());
+ assertEquals(
+ "1920800,1,1920800,F,M,D,W,2 yr Degree,Unknown,500,10000,Good,Unknown,0,6,0,6,0,6",
+ result.get(0).toString());
}
@Test
- public void testOrcTableProjection() throws Exception {
+ public void testScanWithProjectionAndFilter() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
- assert(test1URL != null);
- OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
-
- tEnv.registerTableSource("orcTable", orc);
-
- String query = "Select middle,list,map from orcTable";
- Table t = tEnv.sql(query);
-
- String[] colNames = new String[] {"middle", "list", "map"};
-
- RowTypeInfo rowTypeInfo = new RowTypeInfo(
- new TypeInformation[] {
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO},
- new String[] {"int1", "string1"});
-
- RowTypeInfo structTypeInfo = new RowTypeInfo(
- new TypeInformation[] {ObjectArrayTypeInfo.getInfoFor(rowTypeInfo)},
- new String[] {"list"});
-
- TypeInformation[] colTypes = new TypeInformation[] {
- structTypeInfo,
- ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
- new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, rowTypeInfo)
- };
-
- TableSchema actualTableSchema = new TableSchema(colNames, colTypes);
-
- Assert.assertArrayEquals(t.getSchema().getColumnNames(), colNames);
- Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes);
- Assert.assertEquals(actualTableSchema.toString(), t.getSchema().toString());
+ OrcTableSource orc = OrcTableSource.builder()
+ .path(getPath(TEST_FILE_FLAT))
+ .forOrcSchema(TEST_SCHEMA_FLAT)
+ .build();
+ tEnv.registerTableSource("OrcTable", orc);
+
+ String query =
+ "SELECT " +
+ "MIN(_col4), MAX(_col4), " +
+ "MIN(_col3), MAX(_col3), " +
+ "MIN(_col0), MAX(_col0), " +
+ "MIN(_col2), MAX(_col2), " +
+ "COUNT(*) " +
+ "FROM OrcTable " +
+ "WHERE (_col0 BETWEEN 4975 and 5024 OR _col0 BETWEEN 9975 AND 10024) AND _col1 = 'F'";
+ Table t = tEnv.sqlQuery(query);
DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class);
- List<Row> records = dataSet.collect();
-
- Assert.assertEquals(records.size(), 2);
- for (Row record: records) {
- Assert.assertEquals(record.getArity(), 3);
- }
+ List<Row> result = dataSet.collect();
+ assertEquals(1, result.size());
+ assertEquals(
+ "1500,6000,2 yr Degree,Unknown,4976,10024,D,W,50",
+ result.get(0).toString());
}
+ private String getPath(String fileName) {
+ return getClass().getClassLoader().getResource(fileName).getPath();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
index c285054..4e4be77 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
@@ -18,96 +18,248 @@
package org.apache.flink.orc;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GetCompositeField;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.ItemAt;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.types.Row;
-import org.junit.Assert;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
-import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
/**
* Unit Tests for {@link OrcTableSource}.
*/
public class OrcTableSourceTest {
- private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
- "long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
- "middle:struct<list:array<struct<int1:int,string1:string>>>," +
- "list:array<struct<int1:int,string1:string>>," +
- "map:map<string,struct<int1:int,string1:string>>>";
+ private static final String TEST_FILE_NESTED = "test-data-nested.orc";
+ private static final String TEST_SCHEMA_NESTED =
+ "struct<" +
+ "boolean1:boolean," +
+ "byte1:tinyint," +
+ "short1:smallint," +
+ "int1:int," +
+ "long1:bigint," +
+ "float1:float," +
+ "double1:double," +
+ "bytes1:binary," +
+ "string1:string," +
+ "middle:struct<" +
+ "list:array<" +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">" +
+ ">," +
+ "list:array<" +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">," +
+ "map:map<" +
+ "string," +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">" +
+ ">";
+
+ @Test
+ public void testGetReturnType() throws Exception {
+
+ OrcTableSource orc = OrcTableSource.builder()
+ .path(getPath(TEST_FILE_NESTED))
+ .forOrcSchema(TEST_SCHEMA_NESTED)
+ .build();
- private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
+ TypeInformation<Row> returnType = orc.getReturnType();
+ assertNotNull(returnType);
+ assertTrue(returnType instanceof RowTypeInfo);
+ RowTypeInfo rowType = (RowTypeInfo) returnType;
+
+ RowTypeInfo expected = Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes());
+ assertEquals(expected, rowType);
+ }
@Test
- public void testOrcSchema() throws Exception {
+ public void testGetTableSchema() throws Exception {
+
+ OrcTableSource orc = OrcTableSource.builder()
+ .path(getPath(TEST_FILE_NESTED))
+ .forOrcSchema(TEST_SCHEMA_NESTED)
+ .build();
+
+ TableSchema schema = orc.getTableSchema();
+ assertNotNull(schema);
+ assertArrayEquals(getNestedFieldNames(), schema.getColumnNames());
+ assertArrayEquals(getNestedFieldTypes(), schema.getTypes());
+ }
+
+ @Test
+ public void testProjectFields() throws Exception {
+
+ OrcTableSource orc = OrcTableSource.builder()
+ .path(getPath(TEST_FILE_NESTED))
+ .forOrcSchema(TEST_SCHEMA_NESTED)
+ .build();
+
+ OrcTableSource projected = (OrcTableSource) orc.projectFields(new int[]{3, 5, 1, 0});
- assert(test1URL != null);
- OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
+ // ensure copy is returned
+ assertTrue(orc != projected);
- String expectedSchema = "Row(boolean1: Boolean, byte1: Byte, short1: Short, int1: Integer, long1: Long, " +
- "float1: Float, double1: Double, bytes1: byte[], string1: String, " +
- "middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>), " +
- "list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>, " +
- "map: Map<String, Row(int1: Integer, string1: String)>)";
+ // ensure table schema is identical
+ assertEquals(orc.getTableSchema(), projected.getTableSchema());
- Assert.assertEquals(expectedSchema, orc.getReturnType().toString());
+ // ensure return type was adapted
+ String[] fieldNames = getNestedFieldNames();
+ TypeInformation[] fieldTypes = getNestedFieldTypes();
+ assertEquals(
+ Types.ROW_NAMED(
+ new String[] {fieldNames[3], fieldNames[5], fieldNames[1], fieldNames[0]},
+ new TypeInformation[] {fieldTypes[3], fieldTypes[5], fieldTypes[1], fieldTypes[0]}),
+ projected.getReturnType());
+ // ensure IF is configured with selected fields
+ OrcTableSource spyTS = spy(projected);
+ OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
+ doReturn(mockIF).when(spyTS).buildOrcInputFormat();
+ spyTS.getDataSet(mock(ExecutionEnvironment.class));
+ verify(mockIF).selectFields(eq(3), eq(5), eq(1), eq(0));
}
@Test
- public void testOrcTableSchema() throws Exception {
+ public void testApplyPredicate() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+ OrcTableSource orc = OrcTableSource.builder()
+ .path(getPath(TEST_FILE_NESTED))
+ .forOrcSchema(TEST_SCHEMA_NESTED)
+ .build();
- assert(test1URL != null);
- OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
+ // expressions for predicates
+ Expression pred1 = new GreaterThan(
+ new ResolvedFieldReference("int1", Types.INT),
+ new Literal(100, Types.INT));
+ Expression pred2 = new EqualTo(
+ new ResolvedFieldReference("string1", Types.STRING),
+ new Literal("hello", Types.STRING));
+ Expression pred3 = new EqualTo(
+ new GetCompositeField(
+ new ItemAt(
+ new ResolvedFieldReference(
+ "list",
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(new String[] {"int1", "string1"}, Types.INT, Types.STRING))),
+ new Literal(1, Types.INT)),
+ "int1"),
+ new Literal(1, Types.INT)
+ );
+ ArrayList<Expression> preds = new ArrayList<>();
+ preds.add(pred1);
+ preds.add(pred2);
+ preds.add(pred3);
- tEnv.registerTableSource("orcTable", orc);
- String query = "Select * from orcTable";
- Table t = tEnv.sql(query);
+ // apply predicates on TableSource
+ OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds);
- String[] colNames = new String[] {
- "boolean1", "byte1", "short1", "int1", "long1", "float1",
- "double1", "bytes1", "string1", "list", "list0", "map"
- };
+ // ensure copy is returned
+ assertTrue(orc != projected);
- RowTypeInfo rowTypeInfo = new RowTypeInfo(
- new TypeInformation[] {
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO},
- new String[] {"int1", "string1"});
-
- TypeInformation[] colTypes = new TypeInformation[] {
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.BYTE_TYPE_INFO,
- BasicTypeInfo.SHORT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.FLOAT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
- ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
- new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, rowTypeInfo)
- };
- TableSchema expectedTableSchema = new TableSchema(colNames, colTypes);
+ // ensure table schema is identical
+ assertEquals(orc.getTableSchema(), projected.getTableSchema());
+
+ // ensure return type is identical
+ assertEquals(
+ Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()),
+ projected.getReturnType());
+
+ // ensure IF is configured with supported predicates
+ OrcTableSource spyTS = spy(projected);
+ OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
+ doReturn(mockIF).when(spyTS).buildOrcInputFormat();
+ spyTS.getDataSet(mock(ExecutionEnvironment.class));
+
+ ArgumentCaptor<OrcRowInputFormat.Predicate> arguments = ArgumentCaptor.forClass(OrcRowInputFormat.Predicate.class);
+ verify(mockIF, times(2)).addPredicate(arguments.capture());
+ List<String> values = arguments.getAllValues().stream().map(Object::toString).collect(Collectors.toList());
+ assertTrue(values.contains(
+ new OrcRowInputFormat.Not(new OrcRowInputFormat.LessThanEquals("int1", PredicateLeaf.Type.LONG, 100)).toString()));
+ assertTrue(values.contains(
+ new OrcRowInputFormat.Equals("string1", PredicateLeaf.Type.STRING, "hello").toString()));
+
+ // ensure filter pushdown is correct
+ assertTrue(spyTS.isFilterPushedDown());
+ assertFalse(orc.isFilterPushedDown());
+ }
+
+ private String getPath(String fileName) {
+ return getClass().getClassLoader().getResource(fileName).getPath();
+ }
- Assert.assertArrayEquals(t.getSchema().getColumnNames(), colNames);
- Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes);
- Assert.assertEquals(expectedTableSchema.toString(), t.getSchema().toString());
+ private String[] getNestedFieldNames() {
+ return new String[] {
+ "boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", "bytes1", "string1", "middle", "list", "map"
+ };
+ }
+ private TypeInformation[] getNestedFieldTypes() {
+ return new TypeInformation[]{
+ Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
+ Types.ROW_NAMED(
+ new String[]{"list"},
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ )
+ ),
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ ),
+ new MapTypeInfo<>(
+ Types.STRING,
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ )
+ };
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
new file mode 100644
index 0000000..2cb1715
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link OrcUtils}.
+ *
+ */
+public class OrcUtilsTest {
+
+ @Test
+ public void testFlatSchemaToTypeInfo1() {
+
+ String schema =
+ "struct<" +
+ "boolean1:boolean," +
+ "byte1:tinyint," +
+ "short1:smallint," +
+ "int1:int," +
+ "long1:bigint," +
+ "float1:float," +
+ "double1:double," +
+ "bytes1:binary," +
+ "string1:string," +
+ "date1:date," +
+ "timestamp1:timestamp," +
+ "decimal1:decimal(5,2)" +
+ ">";
+ TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+ Assert.assertNotNull(typeInfo);
+ Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+ RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+ // validate field types
+ Assert.assertArrayEquals(
+ new TypeInformation[]{
+ Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
+ Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO
+ },
+ rowTypeInfo.getFieldTypes());
+
+ // validate field names
+ Assert.assertArrayEquals(
+ new String[] {
+ "boolean1", "byte1", "short1", "int1", "long1", "float1", "double1",
+ "bytes1", "string1", "date1", "timestamp1", "decimal1"
+ },
+ rowTypeInfo.getFieldNames());
+
+ }
+
+ @Test
+ public void testNestedSchemaToTypeInfo1() {
+
+ String schema =
+ "struct<" +
+ "middle:struct<" +
+ "list:array<" +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">" +
+ ">," +
+ "list:array<" +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">," +
+ "map:map<" +
+ "string," +
+ "struct<" +
+ "int1:int," +
+ "string1:string" +
+ ">" +
+ ">" +
+ ">";
+ TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+ Assert.assertNotNull(typeInfo);
+ Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+ RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+ // validate field types
+ Assert.assertArrayEquals(
+ new TypeInformation[]{
+ Types.ROW_NAMED(
+ new String[]{"list"},
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ )
+ ),
+ ObjectArrayTypeInfo.getInfoFor(
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ ),
+ new MapTypeInfo<>(
+ Types.STRING,
+ Types.ROW_NAMED(
+ new String[]{"int1", "string1"},
+ Types.INT, Types.STRING
+ )
+ )
+ },
+ rowTypeInfo.getFieldTypes());
+
+ // validate field names
+ Assert.assertArrayEquals(
+ new String[] {"middle", "list", "map"},
+ rowTypeInfo.getFieldNames());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
deleted file mode 100644
index 60008a0..0000000
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Tests for the {@link RowOrcInputFormat}.
- */
-
-public class RowOrcInputFormatTest {
-
- private RowOrcInputFormat rowOrcInputFormat;
-
- @After
- public void tearDown() throws IOException {
- if (rowOrcInputFormat != null) {
- rowOrcInputFormat.close();
- rowOrcInputFormat.closeInputFormat();
- }
- rowOrcInputFormat = null;
- }
-
- private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
-
- private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
- "long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
- "middle:struct<list:array<struct<int1:int,string1:string>>>," +
- "list:array<struct<int1:int,string1:string>>," +
- "map:map<string,struct<int1:int,string1:string>>>";
-
- private static final String[] TEST1_DATA = new String[] {
- "false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}",
- "true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," +
- "[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, mauddib=1,mauddib}" };
-
- private static final String[] TEST1_PROJECTED_DATA = new String[] {
- "{},[3,good, 4,bad],[1,bye, 2,sigh],hi,[0, 1, 2, 3, 4],-15.0,1.0,9223372036854775807,65536,1024,1,false",
- "{chani=5,chani, mauddib=1,mauddib},[100000000,cat, -100000,in, 1234,hat],[1,bye, 2,sigh],bye," +
- "[],-5.0,2.0,9223372036854775807,65536,2048,100,true" };
-
- private static final String TEST1_INVALID_SCHEMA = "struct<boolean1:int,byte1:tinyint,short1:smallint,int1:int," +
- "long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
- "middle:struct<list:array<struct<int1:int,string1:string>>>," +
- "list:array<struct<int1:int,string1:string>>," +
- "map:map<string,struct<int1:int,string1:string>>>";
-
- @Test(expected = FileNotFoundException.class)
- public void testInvalidPath() throws IOException{
-
- rowOrcInputFormat = new RowOrcInputFormat("TestOrcFile.test2.orc", TEST1_SCHEMA, new Configuration());
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
- rowOrcInputFormat.open(inputSplits[0]);
-
- }
-
- @Test(expected = RuntimeException.class)
- public void testInvalidSchema() throws IOException{
-
- assert(test1URL != null);
- rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_INVALID_SCHEMA, new Configuration());
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
- rowOrcInputFormat.open(inputSplits[0]);
-
- }
-
- @Test(expected = IndexOutOfBoundsException.class)
- public void testInvalidProjection() throws IOException{
-
- assert(test1URL != null);
- rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
- int[] projectionMask = {14};
- rowOrcInputFormat.setFieldMapping(projectionMask);
- }
-
- @Test
- public void testMajorDataTypes() throws IOException{
-
- // test for boolean,byte,short,int,long,float,double,bytes,string,struct,list,map
- assert(test1URL != null);
- rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- Row row = null;
- int count = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- Assert.assertEquals(row.toString(), TEST1_DATA[count++]);
- }
- }
- }
-
- @Test
- public void testProjection() throws IOException{
-
- assert(test1URL != null);
- rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
- int[] projectionMask = {11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
- rowOrcInputFormat.setFieldMapping(projectionMask);
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- Row row = null;
- int count = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- Assert.assertEquals(row.toString(), TEST1_PROJECTED_DATA[count++]);
- }
- }
-
- }
-
- @Test
- public void testTimeStampAndDate() throws IOException{
-
- URL expectedDataURL = getClass().getClassLoader().getResource("TestOrcFile.testDate1900.dat");
- assert(expectedDataURL != null);
- List<String> expectedTimeStampAndDate = Files.readAllLines(Paths.get(expectedDataURL.getPath()));
-
- URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.testDate1900.orc");
- assert(testInputURL != null);
- String path = testInputURL.getPath();
- String schema = "struct<time:timestamp,date:date>";
- rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
- rowOrcInputFormat.openInputFormat();
-
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- List<Object> actualTimeStampAndDate = new ArrayList<>();
-
- Row row = null;
- int count = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- count++;
- if (count <= 10000) {
- actualTimeStampAndDate.add(row.getField(0) + "," + row.getField(1));
- }
-
- }
- }
- Assert.assertEquals(count, 70000);
- Assert.assertEquals(expectedTimeStampAndDate.size(), actualTimeStampAndDate.size());
- Assert.assertEquals(expectedTimeStampAndDate.toString(), actualTimeStampAndDate.toString());
-
- }
-
- @Test
- public void testDecimal() throws IOException{
-
- URL expectedDataURL = getClass().getClassLoader().getResource("decimal.dat");
- List<String> expectedDecimal = Files.readAllLines(Paths.get(expectedDataURL.getPath()));
-
- URL testInputURL = getClass().getClassLoader().getResource("decimal.orc");
- assert(testInputURL != null);
- String path = testInputURL.getPath();
- String schema = "struct<_col0:decimal(10,5)>";
- rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
- rowOrcInputFormat.openInputFormat();
-
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- List<Object> actualDecimal = new ArrayList<>();
-
- Row row = null;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- actualDecimal.add(row.getField(0));
- }
- }
-
- Assert.assertEquals(expectedDecimal.size(), actualDecimal.size());
- Assert.assertEquals(expectedDecimal.toString(), actualDecimal.toString());
-
- }
-
- @Test
- public void testEmptyFile() throws IOException{
-
- URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.emptyFile.orc");
- assert(testInputURL != null);
- String path = testInputURL.getPath();
-
- rowOrcInputFormat = new RowOrcInputFormat(path, TEST1_SCHEMA, new Configuration());
- rowOrcInputFormat.openInputFormat();
-
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- Row row = new Row(1);
- int count = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- count++;
- }
- }
-
- Assert.assertEquals(count, 0);
- }
-
- @Test
- public void testLargeFile() throws IOException{
-
- URL testInputURL = getClass().getClassLoader().getResource("demo-11-none.orc");
- assert(testInputURL != null);
- String path = testInputURL.getPath();
- String schema = "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," +
- "_col5:string,_col6:int,_col7:int,_col8:int>";
-
- rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
- rowOrcInputFormat.openInputFormat();
-
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- Row row = new Row(1);
- int count = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- count++;
- }
- }
-
- Assert.assertEquals(count, 1920800);
- }
-
- @Test
- public void testProducedType() throws IOException{
-
- assert(test1URL != null);
- rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- rowOrcInputFormat.open(inputSplits[0]);
-
- TypeInformation<Row> type = rowOrcInputFormat.getProducedType();
- Assert.assertEquals(type.toString(), "Row(boolean1: Boolean, byte1: Byte, short1: Short, int1: Integer," +
- " long1: Long, float1: Float, double1: Double, bytes1: byte[], string1: String," +
- " middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," +
- " list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>," +
- " map: Map<String, Row(int1: Integer, string1: String)>)");
-
- }
-
- @Test
- public void testProducedTypeWithProjection() throws IOException{
-
- assert(test1URL != null);
- rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
- int[] projectionMask = {9, 10, 11};
- rowOrcInputFormat.setFieldMapping(projectionMask);
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- rowOrcInputFormat.open(inputSplits[0]);
-
- TypeInformation<Row> type = rowOrcInputFormat.getProducedType();
- Assert.assertEquals(type.toString(), "Row(middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," +
- " list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>," +
- " map: Map<String, Row(int1: Integer, string1: String)>)");
-
- }
-
- @Test
- public void testLongList() throws Exception {
-
- URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.listlong.orc");
- assert(testInputURL != null);
- String path = testInputURL.getPath();
- String schema = "struct<mylist1:array<bigint>>";
-
- rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- Row row = null;
- long count = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- Assert.assertEquals(row.getArity(), 1);
- Object object = row.getField(0);
- long[] l = (long[]) object;
-
- Assert.assertEquals(l.length, 2);
- if (count < 50) {
- Assert.assertArrayEquals(l, new long[]{count, count + 1});
- }
- else {
- Assert.assertArrayEquals(l, new long[]{0L, 0L});
- }
- count = count + 2;
- }
- }
- Assert.assertEquals(count, 100);
- }
-
- @Test
- public void testStringList() throws Exception {
-
- URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.liststring.orc");
- assert(testInputURL != null);
- String path = testInputURL.getPath();
- String schema = "struct<mylist1:array<string>>";
-
- rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- Row row = null;
- long count = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- Assert.assertEquals(row.getArity(), 1);
- Object object = row.getField(0);
- String[] l = (String[]) object;
-
- Assert.assertEquals(l.length, 2);
- Assert.assertArrayEquals(l, new String[]{"hello" + count, "hello" + (count + 1) });
- count = count + 2;
- }
- }
- Assert.assertEquals(count, 200);
- }
-
- @Test
- public void testListOfListOfStructOfLong() throws Exception {
- URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.listliststructlong.orc");
- assert(testInputURL != null);
- String path = testInputURL.getPath();
- String schema = "struct<mylist1:array<array<struct<mylong1:bigint>>>>";
-
- rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
-
- rowOrcInputFormat.openInputFormat();
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
-
- Assert.assertEquals(inputSplits.length, 1);
-
- Row row = null;
- long count = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- while (!rowOrcInputFormat.reachedEnd()) {
-
- row = rowOrcInputFormat.nextRecord(row);
- Assert.assertEquals(row.getArity(), 1);
-
- Object[] objects = (Object[]) row.getField(0);
- Assert.assertEquals(objects.length, 1);
-
- Object[] objects1 = (Object[]) objects[0];
- Assert.assertEquals(objects1.length, 1);
-
- Row[] nestedRows = Arrays.copyOf(objects1, objects1.length, Row[].class);
- Assert.assertEquals(nestedRows.length, 1);
-
- Assert.assertEquals(nestedRows[0].getArity(), 1);
-
- Assert.assertEquals(nestedRows[0].getField(0), count);
-
- count++;
- }
- }
- Assert.assertEquals(count, 100);
- }
-
- @Test
- public void testSplit() throws IOException{
-
- URL testInputURL = getClass().getClassLoader().getResource("demo-11-none.orc");
- assert(testInputURL != null);
- String path = testInputURL.getPath();
- String schema = "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," +
- "_col5:string,_col6:int,_col7:int,_col8:int>";
-
- rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
- rowOrcInputFormat.openInputFormat();
-
- FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(10);
-
- Assert.assertEquals(inputSplits.length, 10);
-
- Row row = null;
- int countTotalRecords = 0;
- for (FileInputSplit split : inputSplits) {
- rowOrcInputFormat.open(split);
- int countSplitRecords = 0;
- while (!rowOrcInputFormat.reachedEnd()) {
- row = rowOrcInputFormat.nextRecord(row);
- countSplitRecords++;
- }
- Assert.assertNotEquals(countSplitRecords, 1920800);
- countTotalRecords += countSplitRecords;
- }
-
- Assert.assertEquals(countTotalRecords, 1920800);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc
deleted file mode 100644
index ecdadcb..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc
deleted file mode 100644
index 0f3f9c8..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc
deleted file mode 100644
index 648ea18..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc
deleted file mode 100644
index 75a5f2a..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/200612ee/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc
deleted file mode 100644
index 4fb0bef..0000000
Binary files a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc and /dev/null differ