You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/23 07:51:43 UTC

spark git commit: [SPARK-14551][SQL] Reduce number of NameNode calls in OrcRelation

Repository: spark
Updated Branches:
  refs/heads/master 95faa731c -> e5226e300


[SPARK-14551][SQL] Reduce number of NameNode calls in OrcRelation

## What changes were proposed in this pull request?
When FileSourceStrategy is used, record reader is created which incurs a NN call internally. Later in OrcRelation.unwrapOrcStructs, it ends ups reading the file information to get the ObjectInspector. This incurs additional NN call. It would be good to avoid this additional NN call (specifically for partitioned datasets).

Added OrcRecordReader which is very similar to OrcNewInputFormat.OrcRecordReader with an option of exposing the ObjectInspector. This eliminates the need to look up the file later for generating the object inspector. This would be specifically be useful for partitioned tables/datasets.

## How was this patch tested?
Ran tpc-ds queries manually and also verified by running org.apache.spark.sql.hive.orc.OrcSuite,org.apache.spark.sql.hive.orc.OrcQuerySuite,org.apache.spark.sql.hive.orc.OrcPartitionDiscoverySuite,OrcPartitionDiscoverySuite.OrcHadoopFsRelationSuite,org.apache.spark.sql.hive.execution.HiveCompatibilitySuite

…SourceStrategy mode

Author: Rajesh Balamohan <rb...@apache.org>

Closes #12319 from rajeshbalamohan/SPARK-14551.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5226e30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5226e30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5226e30

Branch: refs/heads/master
Commit: e5226e3007d6645c6d48d3c1b2762566184f3fc7
Parents: 95faa73
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Apr 22 22:51:40 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Apr 22 22:51:40 2016 -0700

----------------------------------------------------------------------
 .../hive/ql/io/orc/SparkOrcNewRecordReader.java | 94 ++++++++++++++++++++
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 25 +++---
 2 files changed, 108 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e5226e30/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
new file mode 100644
index 0000000..f093637
--- /dev/null
+++ b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This is based on hive-exec-1.2.1
+ * {@link org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat.OrcRecordReader}.
+ * This class exposes getObjectInspector which can be used for reducing
+ * NameNode calls in OrcRelation.
+ */
+public class SparkOrcNewRecordReader extends
+    org.apache.hadoop.mapreduce.RecordReader<NullWritable, OrcStruct> {
+  private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+  private final int numColumns;
+  OrcStruct value;
+  private float progress = 0.0f;
+  private ObjectInspector objectInspector;
+
+  public SparkOrcNewRecordReader(Reader file, Configuration conf,
+      long offset, long length) throws IOException {
+    List<OrcProto.Type> types = file.getTypes();
+    numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+    value = new OrcStruct(numColumns);
+    this.reader = OrcInputFormat.createReaderFromFile(file, conf, offset,
+        length);
+    this.objectInspector = file.getObjectInspector();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+      InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public OrcStruct getCurrentValue() throws IOException,
+      InterruptedException {
+    return value;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (reader.hasNext()) {
+      reader.next(value);
+      progress = reader.getProgress();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public ObjectInspector getObjectInspector() {
+    return objectInspector;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e5226e30/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 21591ec..b0f32fa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.{NullWritable, Writable}
 import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -145,20 +144,24 @@ private[sql] class DefaultSource
           val job = Job.getInstance(conf)
           FileInputFormat.setInputPaths(job, file.filePath)
 
-          val inputFormat = new OrcNewInputFormat
           val fileSplit = new FileSplit(
             new Path(new URI(file.filePath)), file.start, file.length, Array.empty
           )
-
-          val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
-          val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-          inputFormat.createRecordReader(fileSplit, hadoopAttemptContext)
+          // Custom OrcRecordReader is used to get
+          // ObjectInspector during recordReader creation itself and can
+          // avoid NameNode call in unwrapOrcStructs per file.
+          // Specifically would be helpful for partitioned datasets.
+          val orcReader = OrcFile.createReader(
+            new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
+          new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength())
         }
 
         // Unwraps `OrcStruct`s to `UnsafeRow`s
         val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
-          file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
-        )
+          conf,
+          requiredSchema,
+          Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
+          new RecordReaderIterator[OrcStruct](orcRecordReader))
 
         // Appends partition values
         val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
@@ -322,10 +325,11 @@ private[orc] case class OrcTableScan(
 
     rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
       val writableIterator = iterator.map(_._2)
+      val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf))
       OrcRelation.unwrapOrcStructs(
-        split.getPath.toString,
         wrappedConf.value,
         StructType.fromAttributes(attributes),
+        maybeStructOI,
         writableIterator
       )
     }
@@ -355,12 +359,11 @@ private[orc] object OrcRelation extends HiveInspectors {
   )
 
   def unwrapOrcStructs(
-      filePath: String,
       conf: Configuration,
       dataSchema: StructType,
+      maybeStructOI: Option[StructObjectInspector],
       iterator: Iterator[Writable]): Iterator[InternalRow] = {
     val deserializer = new OrcSerde
-    val maybeStructOI = OrcFileOperator.getObjectInspector(filePath, Some(conf))
     val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
     val unsafeProjection = UnsafeProjection.create(dataSchema)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org