You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/01 23:18:00 UTC
hive git commit: HIVE-11925 : Hive file format checking breaks load
from named pipes (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/branch-1 fda7c5175 -> ca20049ff
HIVE-11925 : Hive file format checking breaks load from named pipes (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ca20049f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ca20049f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ca20049f
Branch: refs/heads/branch-1
Commit: ca20049ff83db9e111afb47b2e8d93ad6e366299
Parents: fda7c51
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Oct 1 12:42:28 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Oct 1 14:19:01 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/io/HiveFileFormatUtils.java | 95 ++++++++++++++------
.../hadoop/hive/ql/io/InputFormatChecker.java | 5 +-
.../hadoop/hive/ql/io/RCFileInputFormat.java | 3 +-
.../ql/io/SequenceFileInputFormatChecker.java | 3 +-
.../hive/ql/io/VectorizedRCFileInputFormat.java | 3 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 4 +-
.../ql/io/orc/VectorizedOrcInputFormat.java | 2 +-
.../hive/ql/exec/TestFileSinkOperator.java | 2 +-
.../hive/ql/txn/compactor/CompactorTest.java | 2 +-
9 files changed, 80 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
index 50ba740..06d3df7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
@@ -19,8 +19,13 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
+import java.nio.file.FileSystemNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -28,10 +33,13 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -69,6 +77,7 @@ import org.apache.hive.common.util.ReflectionUtil;
*
*/
public final class HiveFileFormatUtils {
+ private static final Log LOG = LogFactory.getLog(HiveFileFormatUtils.class);
static {
outputFormatSubstituteMap =
@@ -177,44 +186,51 @@ public final class HiveFileFormatUtils {
*/
@SuppressWarnings("unchecked")
public static boolean checkInputFormat(FileSystem fs, HiveConf conf,
- Class<? extends InputFormat> inputFormatCls, ArrayList<FileStatus> files)
+ Class<? extends InputFormat> inputFormatCls, List<FileStatus> files)
throws HiveException {
- if (files.size() > 0) {
- Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls);
- if (checkerCls == null
- && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
- // we get a text input format here, we can not determine a file is text
- // according to its content, so we can do is to test if other file
- // format can accept it. If one other file format can accept this file,
- // we treat this file as text file, although it maybe not.
- return checkTextInputFormat(fs, conf, files);
- }
+ if (files.isEmpty()) return false;
+ Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls);
+ if (checkerCls == null
+ && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
+ // we get a text input format here, we can not determine a file is text
+ // according to its content, so we can do is to test if other file
+ // format can accept it. If one other file format can accept this file,
+ // we treat this file as text file, although it maybe not.
+ return checkTextInputFormat(fs, conf, files);
+ }
- if (checkerCls != null) {
- InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache
- .get(checkerCls);
- try {
- if (checkerInstance == null) {
- checkerInstance = checkerCls.newInstance();
- inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance);
- }
- return checkerInstance.validateInput(fs, conf, files);
- } catch (Exception e) {
- throw new HiveException(e);
+ if (checkerCls != null) {
+ InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache.get(checkerCls);
+ try {
+ if (checkerInstance == null) {
+ checkerInstance = checkerCls.newInstance();
+ inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance);
}
+ return checkerInstance.validateInput(fs, conf, files);
+ } catch (Exception e) {
+ throw new HiveException(e);
}
- return true;
}
- return false;
+ return true;
}
@SuppressWarnings("unchecked")
private static boolean checkTextInputFormat(FileSystem fs, HiveConf conf,
- ArrayList<FileStatus> files) throws HiveException {
- Set<Class<? extends InputFormat>> inputFormatter = inputFormatCheckerMap
- .keySet();
+ List<FileStatus> files) throws HiveException {
+ List<FileStatus> files2 = new LinkedList<>(files);
+ Iterator<FileStatus> iter = files2.iterator();
+ while (iter.hasNext()) {
+ FileStatus file = iter.next();
+ if (file == null) continue;
+ if (isPipe(fs, file)) {
+ LOG.info("Skipping format check for " + file.getPath() + " as it is a pipe");
+ iter.remove();
+ }
+ }
+ if (files2.isEmpty()) return true;
+ Set<Class<? extends InputFormat>> inputFormatter = inputFormatCheckerMap.keySet();
for (Class<? extends InputFormat> reg : inputFormatter) {
- boolean result = checkInputFormat(fs, conf, reg, files);
+ boolean result = checkInputFormat(fs, conf, reg, files2);
if (result) {
return false;
}
@@ -222,6 +238,29 @@ public final class HiveFileFormatUtils {
return true;
}
+ // See include/uapi/linux/stat.h
+ private static final int S_IFIFO = 0010000;
+ private static boolean isPipe(FileSystem fs, FileStatus file) {
+ if (fs instanceof DistributedFileSystem) {
+ return false; // Shortcut for HDFS.
+ }
+ int mode = 0;
+ Object pathToLog = file.getPath();
+ try {
+ java.nio.file.Path realPath = Paths.get(file.getPath().toUri());
+ pathToLog = realPath;
+ mode = (Integer)Files.getAttribute(realPath, "unix:mode");
+ } catch (FileSystemNotFoundException t) {
+ return false; // Probably not a local filesystem; no need to check.
+ } catch (UnsupportedOperationException | IOException
+ | SecurityException | IllegalArgumentException t) {
+ LOG.info("Failed to check mode for " + pathToLog + ": "
+ + t.getMessage() + " (" + t.getClass() + ")");
+ return false;
+ }
+ return (mode & S_IFIFO) != 0;
+ }
+
public static RecordWriter getHiveRecordWriter(JobConf jc,
TableDesc tableInfo, Class<? extends Writable> outputClass,
FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
index 3945411..129b834 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +34,6 @@ public interface InputFormatChecker {
* This method is used to validate the input files.
*
*/
- boolean validateInput(FileSystem fs, HiveConf conf,
- ArrayList<FileStatus> files) throws IOException;
+ boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
index 88198ed..6004db8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -59,7 +60,7 @@ public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWr
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
- ArrayList<FileStatus> files) throws IOException {
+ List<FileStatus> files) throws IOException {
if (files.size() <= 0) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
index e2666d7..6cb46c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -35,7 +36,7 @@ public class SequenceFileInputFormatChecker implements InputFormatChecker {
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
- ArrayList<FileStatus> files) throws IOException {
+ List<FileStatus> files) throws IOException {
if (files.size() <= 0) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
index faad5f2..e9e1d5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -56,7 +57,7 @@ public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, V
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
- ArrayList<FileStatus> files) throws IOException {
+ List<FileStatus> files) throws IOException {
if (files.size() <= 0) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index f078018..9a61ca0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -101,7 +101,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* that added this event. Insert and update events include the entire row, while
* delete events have null for row.
*/
-public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
+public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
InputFormatChecker, VectorizedInputFormatInterface,
AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
@@ -323,7 +323,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
- ArrayList<FileStatus> files
+ List<FileStatus> files
) throws IOException {
if (Utilities.isVectorMode(conf)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index 3992d8c..bf09001 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -158,7 +158,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
- ArrayList<FileStatus> files
+ List<FileStatus> files
) throws IOException {
if (files.size() <= 0) {
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index c6ae030..5d140b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -572,7 +572,7 @@ public class TestFileSinkOperator {
}
@Override
- public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws
+ public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws
IOException {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 21adc9d..5a8c932 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -361,7 +361,7 @@ public abstract class CompactorTest {
}
@Override
- public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws
+ public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws
IOException {
return false;
}