You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2013/08/16 10:15:25 UTC
svn commit: r1514611 - in
/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project: CHANGES.txt
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
Author: sandy
Date: Fri Aug 16 08:15:25 2013
New Revision: 1514611
URL: http://svn.apache.org/r1514611
Log:
MAPREDUCE-5462. In map-side sort, swap entire meta entries instead of indexes for better cache performance. (Sandy Ryza)
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1514611&r1=1514610&r2=1514611&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt Fri Aug 16 08:15:25 2013
@@ -13,6 +13,9 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5446. TestJobHistoryEvents and TestJobHistoryParsing have race
conditions (jlowe via kihwal)
+ MAPREDUCE-5462. In map-side sort, swap entire meta entries instead of
+ indexes for better cache performance. (Sandy Ryza)
+
BUG FIXES
MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1514611&r1=1514610&r2=1514611&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Fri Aug 16 08:15:25 2013
@@ -883,10 +883,10 @@ public class MapTask extends Task {
byte[] kvbuffer; // main output buffer
private final byte[] b0 = new byte[0];
- private static final int INDEX = 0; // index offset in acct
- private static final int VALSTART = 1; // val offset in acct
- private static final int KEYSTART = 2; // key offset in acct
- private static final int PARTITION = 3; // partition offset in acct
+ private static final int VALSTART = 0; // val offset in acct
+ private static final int KEYSTART = 1; // key offset in acct
+ private static final int PARTITION = 2; // partition offset in acct
+ private static final int VALLEN = 3; // length of value
private static final int NMETA = 4; // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
@@ -1150,10 +1150,10 @@ public class MapTask extends Task {
distanceTo(keystart, valend, bufvoid));
// write accounting info
- kvmeta.put(kvindex + INDEX, kvindex);
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
+ kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
@@ -1223,17 +1223,11 @@ public class MapTask extends Task {
}
/**
- * For the given meta position, return the dereferenced position in the
- * integer array. Each meta block contains several integers describing
- * record data in its serialized form, but the INDEX is not necessarily
- * related to the proximate metadata. The index value at the referenced int
- * position is the start offset of the associated metadata block. So the
- * metadata INDEX at metapos may point to the metadata described by the
- * metadata block at metapos + k, which contains information about that
- * serialized record.
+ * For the given meta position, return the offset into the int-sized
+ * kvmeta buffer.
*/
int offsetFor(int metapos) {
- return kvmeta.get(metapos * NMETA + INDEX);
+ return metapos * NMETA;
}
/**
@@ -1259,16 +1253,17 @@ public class MapTask extends Task {
kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
}
+ final byte META_BUFFER_TMP[] = new byte[METASIZE];
/**
- * Swap logical indices st i, j MOD offset capacity.
+ * Swap metadata for items i, j
* @see IndexedSortable#swap
*/
public void swap(final int mi, final int mj) {
- final int kvi = (mi % maxRec) * NMETA + INDEX;
- final int kvj = (mj % maxRec) * NMETA + INDEX;
- int tmp = kvmeta.get(kvi);
- kvmeta.put(kvi, kvmeta.get(kvj));
- kvmeta.put(kvj, tmp);
+ int iOff = (mi % maxRec) * METASIZE;
+ int jOff = (mj % maxRec) * METASIZE;
+ System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE);
+ System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE);
+ System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE);
}
/**
@@ -1600,9 +1595,9 @@ public class MapTask extends Task {
while (spindex < mend &&
kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
final int kvoff = offsetFor(spindex % maxRec);
- key.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
- (kvmeta.get(kvoff + VALSTART) -
- kvmeta.get(kvoff + KEYSTART)));
+ int keystart = kvmeta.get(kvoff + KEYSTART);
+ int valstart = kvmeta.get(kvoff + VALSTART);
+ key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
@@ -1728,14 +1723,8 @@ public class MapTask extends Task {
private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
// get the keystart for the next serialized value to be the end
// of this value. If this is the last value in the buffer, use bufend
- final int nextindex = kvoff == kvend
- ? bufend
- : kvmeta.get(
- (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
- // calculate the length of the value
- int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
- ? nextindex - kvmeta.get(kvoff + VALSTART)
- : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
+ final int vallen = kvmeta.get(kvoff + VALLEN);
+ assert vallen >= 0;
vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
}