You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/26 00:39:20 UTC

[spark] branch master updated: [SPARK-27226][SQL] Reduce the code duplicate when upgrading built-in Hive

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 300ec1a  [SPARK-27226][SQL] Reduce the code duplicate when upgrading built-in Hive
300ec1a is described below

commit 300ec1a74cb14867c22e616e657566d510426331
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Mon Mar 25 19:39:00 2019 -0500

    [SPARK-27226][SQL] Reduce the code duplicate when upgrading built-in Hive
    
    ## What changes were proposed in this pull request?
    
    This pr related to #24119. Reduce the code duplicate when upgrading built-in Hive.
    To achieve this, we should avoid using classes in `org.apache.orc.storage.*` because these classes will be replaced with `org.apache.hadoop.hive.*` after upgrading the built-in Hive. Such as:
    ![image](https://user-images.githubusercontent.com/5399861/54437594-e9be1000-476f-11e9-8878-3b7414871ee5.png)
    
    - Move the usage of `org.apache.orc.storage.*` to `OrcShimUtils`:
    1. Add wrapper for `VectorizedRowBatch`(Reduce code duplication of [OrcColumnarBatchReader](https://github.com/apache/spark/pull/24166/files#diff-e594f7295e5408c01ace8175166313b6)).
    2. Move some serializer/deserializer method out of `OrcDeserializer` and `OrcSerializer`(Reduce code duplication of [OrcDeserializer](https://github.com/apache/spark/pull/24166/files#diff-b933819e6dcaff41eee8fce1e8f2932c) and [OrcSerializer](https://github.com/apache/spark/pull/24166/files#diff-6d3849d88929f6ea25c436d71da729da)).
    3. Defined two type aliases: `Operator` and `SearchArgument`(Reduce code duplication of [OrcV1FilterSuite](https://github.com/apache/spark/pull/24166/files#diff-48c4fc7a3b3384a6d0aab246723a0058)).
    
    - Move duplication code to super class:
    1. Add a trait for `OrcFilters`(Reduce code duplication of [OrcFilters](https://github.com/apache/spark/pull/24166/files#diff-224b8cbedf286ecbfdd092d1e2e2f237)).
    2. Move `checkNoFilterPredicate` from `OrcFilterSuite` to `OrcTest`(Reduce code duplication of [OrcFilterSuite](https://github.com/apache/spark/pull/24166/files#diff-8e05c1faaaec98edd7723e62f84066f1)).
    
    After this pr. We only need to copy these 4 files: OrcColumnVector, OrcFilters, OrcFilterSuite and OrcShimUtils.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #24166 from wangyum/SPARK-27226.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../datasources/orc/OrcColumnarBatchReader.java    | 16 +++---
 .../datasources/orc/OrcDeserializer.scala          |  6 +-
 .../sql/execution/datasources/orc/OrcFilters.scala | 34 +----------
 .../execution/datasources/orc/OrcFiltersBase.scala | 58 +++++++++++++++++++
 .../execution/datasources/orc/OrcSerializer.scala  | 16 +-----
 .../execution/datasources/orc/OrcShimUtils.scala   | 66 ++++++++++++++++++++++
 .../execution/datasources/orc/OrcFilterSuite.scala | 28 ---------
 .../sql/execution/datasources/orc/OrcTest.scala    | 34 ++++++++++-
 .../datasources/orc/OrcV1FilterSuite.scala         |  5 +-
 9 files changed, 174 insertions(+), 89 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index efca96e..6a4b116 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -30,9 +30,9 @@ import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.mapred.OrcInputFormat;
-import org.apache.orc.storage.ql.exec.vector.*;
 
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.VectorizedRowBatchWrap;
 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
 import org.apache.spark.sql.types.*;
@@ -48,8 +48,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
   // The capacity of vectorized batch.
   private int capacity;
 
-  // Vectorized ORC Row Batch
-  private VectorizedRowBatch batch;
+  // Vectorized ORC Row Batch wrap.
+  private VectorizedRowBatchWrap wrap;
 
   /**
    * The column IDs of the physical ORC file schema which are required by this reader.
@@ -146,8 +146,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
       int[] requestedDataColIds,
       int[] requestedPartitionColIds,
       InternalRow partitionValues) {
-    batch = orcSchema.createRowBatch(capacity);
-    assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
+    wrap = new VectorizedRowBatchWrap(orcSchema.createRowBatch(capacity));
+    assert(!wrap.batch().selectedInUse); // `selectedInUse` should be initialized with `false`.
     assert(requiredFields.length == requestedDataColIds.length);
     assert(requiredFields.length == requestedPartitionColIds.length);
     // If a required column is also partition column, use partition value and don't read from file.
@@ -180,7 +180,7 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
           missingCol.setIsConstant();
           orcVectorWrappers[i] = missingCol;
         } else {
-          orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
+          orcVectorWrappers[i] = new OrcColumnVector(dt, wrap.batch().cols[colId]);
         }
       }
     }
@@ -193,8 +193,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
    * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns.
    */
   private boolean nextBatch() throws IOException {
-    recordReader.nextBatch(batch);
-    int batchSize = batch.size;
+    recordReader.nextBatch(wrap.batch());
+    int batchSize = wrap.batch().size;
     if (batchSize == 0) {
       return false;
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 62e1670..6d52d40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.orc
 
 import org.apache.hadoop.io._
 import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
-import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
@@ -109,14 +108,13 @@ class OrcDeserializer(
         updater.set(ordinal, bytes)
 
       case DateType => (ordinal, value) =>
-        updater.setInt(ordinal, DateTimeUtils.fromJavaDate(value.asInstanceOf[DateWritable].get))
+        updater.setInt(ordinal, DateTimeUtils.fromJavaDate(OrcShimUtils.getSqlDate(value)))
 
       case TimestampType => (ordinal, value) =>
         updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))
 
       case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
-        val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
-        val v = Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
+        val v = OrcShimUtils.getDecimal(value)
         v.changePrecision(precision, scale)
         updater.set(ordinal, v)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 9848400..112dcb2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -23,7 +23,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
 import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
 import org.apache.orc.storage.serde2.io.HiveDecimalWritable
 
-import org.apache.spark.sql.sources.{And, Filter}
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types._
 
 /**
@@ -56,27 +56,7 @@ import org.apache.spark.sql.types._
  * builder methods mentioned above can only be found in test code, where all tested filters are
  * known to be convertible.
  */
-private[sql] object OrcFilters {
-  private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
-    filters match {
-      case Seq() => None
-      case Seq(filter) => Some(filter)
-      case Seq(filter1, filter2) => Some(And(filter1, filter2))
-      case _ => // length > 2
-        val (left, right) = filters.splitAt(filters.length / 2)
-        Some(And(buildTree(left).get, buildTree(right).get))
-    }
-  }
-
-  // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
-  // in order to distinguish predicate pushdown for nested columns.
-  private def quoteAttributeNameIfNeeded(name: String) : String = {
-    if (!name.contains("`") && name.contains(".")) {
-      s"`$name`"
-    } else {
-      name
-    }
-  }
+private[sql] object OrcFilters extends OrcFiltersBase {
 
   /**
    * Create ORC filter as a SearchArgument instance.
@@ -102,16 +82,6 @@ private[sql] object OrcFilters {
   }
 
   /**
-   * Return true if this is a searchable type in ORC.
-   * Both CharType and VarcharType are cleaned at AstBuilder.
-   */
-  private def isSearchableType(dataType: DataType) = dataType match {
-    case BinaryType => false
-    case _: AtomicType => true
-    case _ => false
-  }
-
-  /**
    * Get PredicateLeafType which is corresponding to the given DataType.
    */
   private def getPredicateLeafType(dataType: DataType) = dataType match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
new file mode 100644
index 0000000..8d4898a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.spark.sql.sources.{And, Filter}
+import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType}
+
+/**
+ * Methods that can be shared when upgrading the built-in Hive.
+ */
+trait OrcFiltersBase {
+
+  private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
+    filters match {
+      case Seq() => None
+      case Seq(filter) => Some(filter)
+      case Seq(filter1, filter2) => Some(And(filter1, filter2))
+      case _ => // length > 2
+        val (left, right) = filters.splitAt(filters.length / 2)
+        Some(And(buildTree(left).get, buildTree(right).get))
+    }
+  }
+
+  // Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
+  // in order to distinguish predicate pushdown for nested columns.
+  protected def quoteAttributeNameIfNeeded(name: String) : String = {
+    if (!name.contains("`") && name.contains(".")) {
+      s"`$name`"
+    } else {
+      name
+    }
+  }
+
+  /**
+   * Return true if this is a searchable type in ORC.
+   * Both CharType and VarcharType are cleaned at AstBuilder.
+   */
+  protected def isSearchableType(dataType: DataType) = dataType match {
+    case BinaryType => false
+    case _: AtomicType => true
+    case _ => false
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
index 90d1268..0b9cbec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.orc
 import org.apache.hadoop.io._
 import org.apache.orc.TypeDescription
 import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
-import org.apache.orc.storage.common.`type`.HiveDecimal
-import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
@@ -139,14 +137,7 @@ class OrcSerializer(dataSchema: StructType) {
       new BytesWritable(getter.getBinary(ordinal))
 
     case DateType =>
-      if (reuseObj) {
-        val result = new DateWritable()
-        (getter, ordinal) =>
-          result.set(getter.getInt(ordinal))
-          result
-      } else {
-        (getter, ordinal) => new DateWritable(getter.getInt(ordinal))
-      }
+      OrcShimUtils.getDateWritable(reuseObj)
 
     // The following cases are already expensive, reusing object or not doesn't matter.
 
@@ -156,9 +147,8 @@ class OrcSerializer(dataSchema: StructType) {
       result.setNanos(ts.getNanos)
       result
 
-    case DecimalType.Fixed(precision, scale) => (getter, ordinal) =>
-      val d = getter.getDecimal(ordinal, precision, scale)
-      new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
+    case DecimalType.Fixed(precision, scale) =>
+      OrcShimUtils.getHiveDecimalWritable(precision, scale)
 
     case st: StructType => (getter, ordinal) =>
       val result = createOrcValue(st).asInstanceOf[OrcStruct]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
new file mode 100644
index 0000000..68503ab
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import java.sql.Date
+
+import org.apache.orc.storage.common.`type`.HiveDecimal
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch
+import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument}
+import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
+import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
+
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.types.Decimal
+
+/**
+ * Various utilities for ORC used to upgrade the built-in Hive.
+ */
+private[sql] object OrcShimUtils {
+
+  class VectorizedRowBatchWrap(val batch: VectorizedRowBatch) {}
+
+  private[sql] type Operator = OrcOperator
+  private[sql] type SearchArgument = OrcSearchArgument
+
+  def getSqlDate(value: Any): Date = value.asInstanceOf[DateWritable].get
+
+  def getDecimal(value: Any): Decimal = {
+    val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
+    Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
+  }
+
+  def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
+    if (reuseObj) {
+      val result = new DateWritable()
+      (getter, ordinal) =>
+        result.set(getter.getInt(ordinal))
+        result
+    } else {
+      (getter: SpecializedGetters, ordinal: Int) =>
+        new DateWritable(getter.getInt(ordinal))
+    }
+  }
+
+  def getHiveDecimalWritable(precision: Int, scale: Int):
+      (SpecializedGetters, Int) => HiveDecimalWritable = {
+    (getter, ordinal) =>
+      val d = getter.getDecimal(ordinal, precision, scale)
+      new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 7b65ba8..e96c6fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -89,34 +89,6 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
     checkFilterPredicate(df, predicate, checkLogicalOperator)
   }
 
-  protected def checkNoFilterPredicate
-      (predicate: Predicate, noneSupported: Boolean = false)
-      (implicit df: DataFrame): Unit = {
-    val output = predicate.collect { case a: Attribute => a }.distinct
-    val query = df
-      .select(output.map(e => Column(e)): _*)
-      .where(Column(predicate))
-
-    query.queryExecution.optimizedPlan match {
-      case PhysicalOperation(_, filters,
-      DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
-        assert(filters.nonEmpty, "No filter is analyzed from the given query")
-        val scanBuilder = orcTable.newScanBuilder(options)
-        scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
-        val pushedFilters = scanBuilder.pushedFilters()
-        if (noneSupported) {
-          assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
-        } else {
-          assert(pushedFilters.nonEmpty, "No filter is pushed down")
-          val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
-          assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
-        }
-
-      case _ =>
-        throw new AnalysisException("Can not match OrcTable in the query.")
-    }
-  }
-
   test("filter pushdown - integer") {
     withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
       checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 411e632..adbd93d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -25,7 +25,11 @@ import scala.reflect.runtime.universe.TypeTag
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
 
@@ -104,4 +108,32 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
       assert(actual < numRows)
     }
   }
+
+  protected def checkNoFilterPredicate
+      (predicate: Predicate, noneSupported: Boolean = false)
+      (implicit df: DataFrame): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+    val query = df
+      .select(output.map(e => Column(e)): _*)
+      .where(Column(predicate))
+
+    query.queryExecution.optimizedPlan match {
+      case PhysicalOperation(_, filters,
+      DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
+        assert(filters.nonEmpty, "No filter is analyzed from the given query")
+        val scanBuilder = orcTable.newScanBuilder(options)
+        scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
+        val pushedFilters = scanBuilder.pushedFilters()
+        if (noneSupported) {
+          assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
+        } else {
+          assert(pushedFilters.nonEmpty, "No filter is pushed down")
+          val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
+          assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
+        }
+
+      case _ =>
+        throw new AnalysisException("Can not match OrcTable in the query.")
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala
index 5a1bf9b..4452780 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1FilterSuite.scala
@@ -18,14 +18,13 @@ package org.apache.spark.sql.execution.datasources.orc
 
 import scala.collection.JavaConverters._
 
-import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
-
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{Column, DataFrame}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.{Operator, SearchArgument}
 import org.apache.spark.sql.internal.SQLConf
 
 class OrcV1FilterSuite extends OrcFilterSuite {
@@ -63,7 +62,7 @@ class OrcV1FilterSuite extends OrcFilterSuite {
   }
 
   override def checkFilterPredicate
-      (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
+      (predicate: Predicate, filterOperator: Operator)
       (implicit df: DataFrame): Unit = {
     def checkComparisonOperator(filter: SearchArgument) = {
       val operator = filter.getLeaves.asScala


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org