You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/07/18 03:46:53 UTC

svn commit: r1753140 - /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java

Author: xuefu
Date: Mon Jul 18 03:46:53 2016
New Revision: 1753140

URL: http://svn.apache.org/viewvc?rev=1753140&view=rev
Log:
PIG-4941: TestRank3#testRankWithSplitInMap hangs after upgrade to spark 1.6.1 (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1753140&r1=1753139&r2=1753140&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Mon Jul 18 03:46:53 2016
@@ -48,6 +48,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -125,6 +126,12 @@ public class PigSplit extends InputSplit
      */
     String[] locations = null;
 
+
+    /**
+     * overall splitLocationInfos
+     */
+    SplitLocationInfo[] splitLocationInfos = null;
+
     // this seems necessary for Hadoop to instatiate this split on the
     // backend
     public PigSplit() {}
@@ -201,6 +208,51 @@ public class PigSplit extends InputSplit
         return locations;
     }
 
+
+    @Override
+    public SplitLocationInfo[] getLocationInfo() throws IOException {
+        if (splitLocationInfos == null) {
+            HashMap<SplitLocationInfo, Long> locMap = new HashMap<SplitLocationInfo, Long>();
+            Long lenInMap;
+            for (InputSplit split : wrappedSplits) {
+                SplitLocationInfo[] locs = split.getLocationInfo();
+                if( locs != null) {
+                    for (SplitLocationInfo loc : locs) {
+                        try {
+                            if ((lenInMap = locMap.get(loc)) == null)
+                                locMap.put(loc, split.getLength());
+                            else
+                                locMap.put(loc, lenInMap + split.getLength());
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException("InputSplit.getLength throws exception: ", e);
+                        }
+                    }
+                }
+            }
+            Set<Map.Entry<SplitLocationInfo, Long>> entrySet = locMap.entrySet();
+            Map.Entry<SplitLocationInfo, Long>[] hostSize =
+                    entrySet.toArray(new Map.Entry[entrySet.size()]);
+            Arrays.sort(hostSize, new Comparator<Map.Entry<SplitLocationInfo, Long>>() {
+
+                @Override
+                public int compare(Entry<SplitLocationInfo, Long> o1, Entry<SplitLocationInfo, Long> o2) {
+                    long diff = o1.getValue() - o2.getValue();
+                    if (diff < 0) return 1;
+                    if (diff > 0) return -1;
+                    return 0;
+                }
+            });
+            // maximum 5 locations are in list: refer to PIG-1648 for more details
+            int nHost = Math.min(hostSize.length, 5);
+            splitLocationInfos = new SplitLocationInfo[nHost];
+            for (int i = 0; i < nHost; ++i) {
+                splitLocationInfos[i] = hostSize[i].getKey();
+            }
+        }
+        return splitLocationInfos;
+    }
+
+
     @Override
     public long getLength() throws IOException, InterruptedException {
         if (length == -1) {