You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/07/18 03:46:53 UTC
svn commit: r1753140 -
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
Author: xuefu
Date: Mon Jul 18 03:46:53 2016
New Revision: 1753140
URL: http://svn.apache.org/viewvc?rev=1753140&view=rev
Log:
PIG-4941: TestRank3#testRankWithSplitInMap hangs after upgrade to spark 1.6.1 (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1753140&r1=1753139&r2=1753140&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Mon Jul 18 03:46:53 2016
@@ -48,6 +48,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -125,6 +126,12 @@ public class PigSplit extends InputSplit
*/
String[] locations = null;
+
+ /**
+ * overall splitLocationInfos
+ */
+ SplitLocationInfo[] splitLocationInfos = null;
+
// this seems necessary for Hadoop to instatiate this split on the
// backend
public PigSplit() {}
@@ -201,6 +208,51 @@ public class PigSplit extends InputSplit
return locations;
}
+
+ @Override
+ public SplitLocationInfo[] getLocationInfo() throws IOException {
+ if (splitLocationInfos == null) {
+ HashMap<SplitLocationInfo, Long> locMap = new HashMap<SplitLocationInfo, Long>();
+ Long lenInMap;
+ for (InputSplit split : wrappedSplits) {
+ SplitLocationInfo[] locs = split.getLocationInfo();
+ if( locs != null) {
+ for (SplitLocationInfo loc : locs) {
+ try {
+ if ((lenInMap = locMap.get(loc)) == null)
+ locMap.put(loc, split.getLength());
+ else
+ locMap.put(loc, lenInMap + split.getLength());
+ } catch (InterruptedException e) {
+ throw new RuntimeException("InputSplit.getLength throws exception: ", e);
+ }
+ }
+ }
+ }
+ Set<Map.Entry<SplitLocationInfo, Long>> entrySet = locMap.entrySet();
+ Map.Entry<SplitLocationInfo, Long>[] hostSize =
+ entrySet.toArray(new Map.Entry[entrySet.size()]);
+ Arrays.sort(hostSize, new Comparator<Map.Entry<SplitLocationInfo, Long>>() {
+
+ @Override
+ public int compare(Entry<SplitLocationInfo, Long> o1, Entry<SplitLocationInfo, Long> o2) {
+ long diff = o1.getValue() - o2.getValue();
+ if (diff < 0) return 1;
+ if (diff > 0) return -1;
+ return 0;
+ }
+ });
+ // maximum 5 locations are in list: refer to PIG-1648 for more details
+ int nHost = Math.min(hostSize.length, 5);
+ splitLocationInfos = new SplitLocationInfo[nHost];
+ for (int i = 0; i < nHost; ++i) {
+ splitLocationInfos[i] = hostSize[i].getKey();
+ }
+ }
+ return splitLocationInfos;
+ }
+
+
@Override
public long getLength() throws IOException, InterruptedException {
if (length == -1) {