You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/08/04 02:16:14 UTC

svn commit: r1693986 - in /pig/trunk: CHANGES.txt src/org/apache/pig/builtin/OrcStorage.java src/org/apache/pig/impl/util/Utils.java

Author: daijy
Date: Tue Aug  4 00:16:13 2015
New Revision: 1693986

URL: http://svn.apache.org/r1693986
Log:
PIG-4624: Error on ORC empty file without schema

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1693986&r1=1693985&r2=1693986&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug  4 00:16:13 2015
@@ -32,6 +32,8 @@ PIG-4570: Allow AvroStorage to use a cla
 
 BUG FIXES
 
+PIG-4624: Error on ORC empty file without schema (daijy)
+
 PIG-3622: Allow casting bytearray fields to bytearray type (redisliu via daijy)
 
 PIG-4618: When use tez as the engine , set pig.user.cache.enabled=true do not take effect (wisgood via rohini)

Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1693986&r1=1693985&r2=1693986&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Tue Aug  4 00:16:13 2015
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
@@ -406,7 +407,7 @@ public class OrcStorage extends LoadFunc
         return FuncUtils.getShipFiles(classList);
     }
 
-    private static Path getFirstFile(String location, FileSystem fs) throws IOException {
+    private static Path getFirstFile(String location, FileSystem fs, PathFilter filter) throws IOException {
         String[] locations = getPathStrings(location);
         Path[] paths = new Path[locations.length];
         for (int i = 0; i < paths.length; ++i) {
@@ -423,7 +424,7 @@ public class OrcStorage extends LoadFunc
         }
         FileStatus[] statusArray = (FileStatus[]) statusList
                 .toArray(new FileStatus[statusList.size()]);
-        Path p = Utils.depthFirstSearchForFile(statusArray, fs);
+        Path p = Utils.depthFirstSearchForFile(statusArray, fs, filter);
         return p;
     }
 
@@ -456,7 +457,7 @@ public class OrcStorage extends LoadFunc
 
     private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
         FileSystem fs = FileSystem.get(job.getConfiguration());
-        Path path = getFirstFile(location, fs);
+        Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs));
         if (path == null) {
             log.info("Cannot find any ORC files from " + location +
                     ". Probably multiple load store in script.");
@@ -467,6 +468,28 @@ public class OrcStorage extends LoadFunc
         return TypeInfoUtils.getTypeInfoFromObjectInspector(oip);
     }
 
+    public static class NonEmptyOrcFileFilter implements PathFilter {
+        private FileSystem fs;
+        public NonEmptyOrcFileFilter(FileSystem fs) {
+            this.fs = fs;
+        }
+        @Override
+        public boolean accept(Path path) {
+            Reader reader;
+            try {
+                reader = OrcFile.createReader(fs, path);
+                ObjectInspector oip = (ObjectInspector)reader.getObjectInspector();
+                ResourceFieldSchema rs = HiveUtils.getResourceFieldSchema(TypeInfoUtils.getTypeInfoFromObjectInspector(oip));
+                if (rs.getSchema().getFields().length!=0) {
+                    return true;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return false;
+        }
+    }
+
     @Override
     public ResourceStatistics getStatistics(String location, Job job)
             throws IOException {

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1693986&r1=1693985&r2=1693986&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Tue Aug  4 00:16:13 2015
@@ -623,16 +623,9 @@ public class Utils {
      * @throws IOException
      */
 
-    public static Path depthFirstSearchForFile(final FileStatus fileStatus,
-        final FileSystem fileSystem) throws IOException {
-      if (fileSystem.isFile(fileStatus.getPath())) {
-        return fileStatus.getPath();
-      } else {
-        return depthFirstSearchForFile(
-            fileSystem.listStatus(fileStatus.getPath(), VISIBLE_FILES),
-            fileSystem);
-      }
-
+    public static Path depthFirstSearchForFile(final FileStatus[] statusArray,
+            final FileSystem fileSystem) throws IOException {
+        return depthFirstSearchForFile(statusArray, fileSystem, null);
     }
 
     /**
@@ -644,7 +637,7 @@ public class Utils {
      * @throws IOException
      */
     public static Path depthFirstSearchForFile(final FileStatus[] statusArray,
-        final FileSystem fileSystem) throws IOException {
+        final FileSystem fileSystem, PathFilter filter) throws IOException {
 
       // Most recent files first
       Arrays.sort(statusArray,
@@ -657,10 +650,17 @@ public class Utils {
       );
 
       for (FileStatus f : statusArray) {
-        Path p = depthFirstSearchForFile(f, fileSystem);
-        if (p != null) {
-          return p;
-        }
+          if (fileSystem.isFile(f.getPath())) {
+              if (filter == null || filter.accept(f.getPath())) {
+                  return f.getPath();
+              } else {
+                  continue;
+              }
+            } else {
+              return depthFirstSearchForFile(
+                  fileSystem.listStatus(f.getPath(), VISIBLE_FILES),
+                  fileSystem, filter);
+            }
       }
 
       return null;