You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/02 04:37:50 UTC

[15/22] hive git commit: HIVE-11925 : Hive file format checking breaks load from named pipes (Sergey Shelukhin, reviewed by Ashutosh Chauhan)

HIVE-11925 : Hive file format checking breaks load from named pipes (Sergey Shelukhin, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82bc0e1c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82bc0e1c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82bc0e1c

Branch: refs/heads/llap
Commit: 82bc0e1c79ca656ec34a43efe4a8807f0f655e30
Parents: 24988f7
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Oct 1 12:42:28 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Oct 1 12:42:28 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/HiveFileFormatUtils.java  | 95 ++++++++++++++------
 .../hadoop/hive/ql/io/InputFormatChecker.java   |  5 +-
 .../hadoop/hive/ql/io/RCFileInputFormat.java    |  3 +-
 .../ql/io/SequenceFileInputFormatChecker.java   |  3 +-
 .../hive/ql/io/VectorizedRCFileInputFormat.java |  3 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  4 +-
 .../ql/io/orc/VectorizedOrcInputFormat.java     |  2 +-
 .../hive/ql/exec/TestFileSinkOperator.java      |  2 +-
 .../hive/ql/txn/compactor/CompactorTest.java    |  2 +-
 9 files changed, 80 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index 50ba740..06d3df7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -19,8 +19,13 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
+import java.nio.file.FileSystemNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -28,10 +33,13 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -69,6 +77,7 @@ import org.apache.hive.common.util.ReflectionUtil;
  *
  */
 public final class HiveFileFormatUtils {
+  private static final Log LOG = LogFactory.getLog(HiveFileFormatUtils.class);
 
   static {
     outputFormatSubstituteMap =
@@ -177,44 +186,51 @@ public final class HiveFileFormatUtils {
    */
   @SuppressWarnings("unchecked")
   public static boolean checkInputFormat(FileSystem fs, HiveConf conf,
-      Class<? extends InputFormat> inputFormatCls, ArrayList<FileStatus> files)
+      Class<? extends InputFormat> inputFormatCls, List<FileStatus> files)
       throws HiveException {
-    if (files.size() > 0) {
-      Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls);
-      if (checkerCls == null
-          && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
-        // we get a text input format here, we can not determine a file is text
-        // according to its content, so we can do is to test if other file
-        // format can accept it. If one other file format can accept this file,
-        // we treat this file as text file, although it maybe not.
-        return checkTextInputFormat(fs, conf, files);
-      }
+    if (files.isEmpty()) return false;
+    Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls);
+    if (checkerCls == null
+        && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
+      // we get a text input format here, we can not determine a file is text
+      // according to its content, so we can do is to test if other file
+      // format can accept it. If one other file format can accept this file,
+      // we treat this file as text file, although it maybe not.
+      return checkTextInputFormat(fs, conf, files);
+    }
 
-      if (checkerCls != null) {
-        InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache
-            .get(checkerCls);
-        try {
-          if (checkerInstance == null) {
-            checkerInstance = checkerCls.newInstance();
-            inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance);
-          }
-          return checkerInstance.validateInput(fs, conf, files);
-        } catch (Exception e) {
-          throw new HiveException(e);
+    if (checkerCls != null) {
+      InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache.get(checkerCls);
+      try {
+        if (checkerInstance == null) {
+          checkerInstance = checkerCls.newInstance();
+          inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance);
         }
+        return checkerInstance.validateInput(fs, conf, files);
+      } catch (Exception e) {
+        throw new HiveException(e);
       }
-      return true;
     }
-    return false;
+    return true;
   }
 
   @SuppressWarnings("unchecked")
   private static boolean checkTextInputFormat(FileSystem fs, HiveConf conf,
-      ArrayList<FileStatus> files) throws HiveException {
-    Set<Class<? extends InputFormat>> inputFormatter = inputFormatCheckerMap
-        .keySet();
+      List<FileStatus> files) throws HiveException {
+    List<FileStatus> files2 = new LinkedList<>(files);
+    Iterator<FileStatus> iter = files2.iterator();
+    while (iter.hasNext()) {
+      FileStatus file = iter.next();
+      if (file == null) continue;
+      if (isPipe(fs, file)) {
+        LOG.info("Skipping format check for " + file.getPath() + " as it is a pipe");
+        iter.remove();
+      }
+    }
+    if (files2.isEmpty()) return true;
+    Set<Class<? extends InputFormat>> inputFormatter = inputFormatCheckerMap.keySet();
     for (Class<? extends InputFormat> reg : inputFormatter) {
-      boolean result = checkInputFormat(fs, conf, reg, files);
+      boolean result = checkInputFormat(fs, conf, reg, files2);
       if (result) {
         return false;
       }
@@ -222,6 +238,29 @@ public final class HiveFileFormatUtils {
     return true;
   }
 
+  // See include/uapi/linux/stat.h
+  private static final int S_IFIFO = 0010000;
+  private static boolean isPipe(FileSystem fs, FileStatus file) {
+    if (fs instanceof DistributedFileSystem) {
+      return false; // Shortcut for HDFS.
+    }
+    int mode = 0;
+    Object pathToLog = file.getPath();
+    try {
+      java.nio.file.Path realPath = Paths.get(file.getPath().toUri());
+      pathToLog = realPath;
+      mode = (Integer)Files.getAttribute(realPath, "unix:mode");
+    } catch (FileSystemNotFoundException t) {
+      return false; // Probably not a local filesystem; no need to check.
+    } catch (UnsupportedOperationException | IOException
+        | SecurityException | IllegalArgumentException t) {
+      LOG.info("Failed to check mode for " + pathToLog + ": "
+        + t.getMessage() + " (" + t.getClass() + ")");
+      return false;
+    }
+    return (mode & S_IFIFO) != 0;
+  }
+
   public static RecordWriter getHiveRecordWriter(JobConf jc,
       TableDesc tableInfo, Class<? extends Writable> outputClass,
       FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
index 3945411..129b834 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +34,6 @@ public interface InputFormatChecker {
    * This method is used to validate the input files.
    * 
    */
-  boolean validateInput(FileSystem fs, HiveConf conf,
-      ArrayList<FileStatus> files) throws IOException;
+  boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
index 88198ed..6004db8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,7 +60,7 @@ public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWr
 
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
-      ArrayList<FileStatus> files) throws IOException {
+      List<FileStatus> files) throws IOException {
     if (files.size() <= 0) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
index e2666d7..6cb46c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,7 +36,7 @@ public class SequenceFileInputFormatChecker implements InputFormatChecker {
 
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
-      ArrayList<FileStatus> files) throws IOException {
+      List<FileStatus> files) throws IOException {
     if (files.size() <= 0) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
index faad5f2..e9e1d5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -56,7 +57,7 @@ public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, V
 
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
-      ArrayList<FileStatus> files) throws IOException {
+      List<FileStatus> files) throws IOException {
     if (files.size() <= 0) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index c45b6e6..57bde3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -106,7 +106,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * that added this event. Insert and update events include the entire row, while
  * delete events have null for row.
  */
-public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
+public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   InputFormatChecker, VectorizedInputFormatInterface,
     AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
 
@@ -395,7 +395,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   }
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
-                               ArrayList<FileStatus> files
+                               List<FileStatus> files
                               ) throws IOException {
 
     if (Utilities.isVectorMode(conf)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index 3992d8c..bf09001 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -158,7 +158,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
 
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
-      ArrayList<FileStatus> files
+      List<FileStatus> files
       ) throws IOException {
     if (files.size() <= 0) {
       return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 9e89376..4594836 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -704,7 +704,7 @@ public class TestFileSinkOperator {
     }
 
     @Override
-    public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws
+    public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws
         IOException {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/82bc0e1c/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 21adc9d..5a8c932 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -361,7 +361,7 @@ public abstract class CompactorTest {
     }
 
     @Override
-    public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws
+    public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws
         IOException {
       return false;
     }