You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/10 21:41:17 UTC

spark git commit: [SPARK-17993][SQL] Fix Parquet log output redirection

Repository: spark
Updated Branches:
  refs/heads/master 16eaad9da -> b533fa2b2


[SPARK-17993][SQL] Fix Parquet log output redirection

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993)
## What changes were proposed in this pull request?

PR #14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader:

```
Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0
org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\)
    at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
    at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60)
    at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263)
    at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
```

This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format.

This is a regression I noted as something we needed to fix as a follow up.

It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection.

## How was this patch tested?

I tested this manually in four ways:
1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`.
2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0.
3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0.
4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0.

I ran each test with a new instance of `spark-shell` or `spark-sql`.

Incidentally, I found that test case 3 was not a regression\u2014redirection was not occurring in the master codebase prior to #14690.

I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible.

cc ericl dongjoon-hyun

Author: Michael Allman <mi...@videoamp.com>

Closes #15538 from mallman/spark-17993-fix_parquet_log_redirection.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b533fa2b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b533fa2b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b533fa2b

Branch: refs/heads/master
Commit: b533fa2b205544b42dcebe0a6fee9d8275f6da7d
Parents: 16eaad9
Author: Michael Allman <mi...@videoamp.com>
Authored: Thu Nov 10 13:41:13 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Nov 10 13:41:13 2016 -0800

----------------------------------------------------------------------
 .../parquet/ParquetLogRedirector.java           | 72 ++++++++++++++++++++
 .../datasources/parquet/ParquetFileFormat.scala | 58 ++++------------
 sql/core/src/test/resources/log4j.properties    |  4 +-
 sql/hive/src/test/resources/log4j.properties    |  4 ++
 4 files changed, 90 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b533fa2b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
new file mode 100644
index 0000000..7a7f32e
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.Serializable;
+import java.util.logging.Handler;
+import java.util.logging.Logger;
+
+import org.apache.parquet.Log;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+// Redirects the JUL logging for parquet-mr versions <= 1.8 to SLF4J logging using
+// SLF4JBridgeHandler. Parquet-mr versions >= 1.9 use SLF4J directly
+final class ParquetLogRedirector implements Serializable {
+  // Client classes should hold a reference to INSTANCE to ensure redirection occurs. This is
+  // especially important for Serializable classes where fields are set but constructors are
+  // ignored
+  static final ParquetLogRedirector INSTANCE = new ParquetLogRedirector();
+
+  // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
+  // However, the root JUL logger used by Parquet isn't properly referenced.  Here we keep
+  // references to loggers in both parquet-mr <= 1.6 and 1.7/1.8
+  private static final Logger apacheParquetLogger =
+    Logger.getLogger(Log.class.getPackage().getName());
+  private static final Logger parquetLogger = Logger.getLogger("parquet");
+
+  static {
+    // For parquet-mr 1.7 and 1.8, which are under `org.apache.parquet` namespace.
+    try {
+      Class.forName(Log.class.getName());
+      redirect(Logger.getLogger(Log.class.getPackage().getName()));
+    } catch (ClassNotFoundException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
+    // namespace.
+    try {
+      Class.forName("parquet.Log");
+      redirect(Logger.getLogger("parquet"));
+    } catch (Throwable t) {
+      // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
+      // when Spark is built with SBT. So `parquet.Log` may not be found.  This try/catch block
+      // should be removed after this issue is fixed.
+    }
+  }
+
+  private ParquetLogRedirector() {
+  }
+
+  private static void redirect(Logger logger) {
+    for (Handler handler : logger.getHandlers()) {
+      logger.removeHandler(handler);
+    }
+    logger.setUseParentHandlers(false);
+    logger.addHandler(new SLF4JBridgeHandler());
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b533fa2b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b8ea7f4..031a0fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.net.URI
-import java.util.logging.{Logger => JLogger}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -29,14 +28,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.parquet.{Log => ApacheParquetLog}
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.codec.CodecConfig
 import org.apache.parquet.hadoop.util.ContextUtil
 import org.apache.parquet.schema.MessageType
-import org.slf4j.bridge.SLF4JBridgeHandler
 
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
@@ -56,6 +53,11 @@ class ParquetFileFormat
   with DataSourceRegister
   with Logging
   with Serializable {
+  // Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
+  // ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
+  // is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
+  // here.
+  private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
 
   override def shortName(): String = "parquet"
 
@@ -129,10 +131,14 @@ class ParquetFileFormat
       conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
     }
 
-    ParquetFileFormat.redirectParquetLogs()
-
     new OutputWriterFactory {
-      override def newInstance(
+      // This OutputWriterFactory instance is deserialized when writing Parquet files on the
+      // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
+      // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
+      // initialized.
+      private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
+
+        override def newInstance(
           path: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
@@ -673,44 +679,4 @@ object ParquetFileFormat extends Logging {
         Failure(cause)
     }.toOption
   }
-
-  // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
-  // However, the root JUL logger used by Parquet isn't properly referenced.  Here we keep
-  // references to loggers in both parquet-mr <= 1.6 and >= 1.7
-  val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
-  val parquetLogger: JLogger = JLogger.getLogger("parquet")
-
-  // Parquet initializes its own JUL logger in a static block which always prints to stdout.  Here
-  // we redirect the JUL logger via SLF4J JUL bridge handler.
-  val redirectParquetLogsViaSLF4J: Unit = {
-    def redirect(logger: JLogger): Unit = {
-      logger.getHandlers.foreach(logger.removeHandler)
-      logger.setUseParentHandlers(false)
-      logger.addHandler(new SLF4JBridgeHandler)
-    }
-
-    // For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
-    // scalastyle:off classforname
-    Class.forName(classOf[ApacheParquetLog].getName)
-    // scalastyle:on classforname
-    redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))
-
-    // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
-    // namespace.
-    try {
-      // scalastyle:off classforname
-      Class.forName("parquet.Log")
-      // scalastyle:on classforname
-      redirect(JLogger.getLogger("parquet"))
-    } catch { case _: Throwable =>
-      // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
-      // when Spark is built with SBT. So `parquet.Log` may not be found.  This try/catch block
-      // should be removed after this issue is fixed.
-    }
-  }
-
-  /**
-   * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`.
-   */
-  def redirectParquetLogs(): Unit = {}
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b533fa2b/sql/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index 33b9ecf..25b8173 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -53,5 +53,5 @@ log4j.additivity.hive.ql.metadata.Hive=false
 log4j.logger.hive.ql.metadata.Hive=OFF
 
 # Parquet related logging
-log4j.logger.org.apache.parquet.hadoop=WARN
-log4j.logger.org.apache.spark.sql.parquet=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR

http://git-wip-us.apache.org/repos/asf/spark/blob/b533fa2b/sql/hive/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties
index fea3404..072bb25 100644
--- a/sql/hive/src/test/resources/log4j.properties
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -59,3 +59,7 @@ log4j.logger.hive.ql.metadata.Hive=OFF
 
 log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
 log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
+
+# Parquet related logging
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org