You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/24 19:40:58 UTC
[1/2] git commit: TEZ-950. Port MAPREDUCE-5462. Contributed by Gopal
V and Siddharth Seth.
Repository: incubator-tez
Updated Branches:
refs/heads/master 8fab9244f -> a3a9ceaad
TEZ-950. Port MAPREDUCE-5462. Contributed by Gopal V and Siddharth Seth.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/be0b47a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/be0b47a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/be0b47a0
Branch: refs/heads/master
Commit: be0b47a0e58ccf166472e38bf15101434dd33b53
Parents: 8fab924
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 24 11:38:56 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 24 11:38:56 2014 -0700
----------------------------------------------------------------------
.../common/sort/impl/dflt/DefaultSorter.java | 56 ++++++++------------
1 file changed, 22 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/be0b47a0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index e311e93..95256d0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -80,10 +80,10 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
byte[] kvbuffer; // main output buffer
private final byte[] b0 = new byte[0];
- protected static final int INDEX = 0; // index offset in acct
- protected static final int VALSTART = 1; // val offset in acct
- protected static final int KEYSTART = 2; // key offset in acct
- protected static final int PARTITION = 3; // partition offset in acct
+ protected static final int VALSTART = 0; // val offset in acct
+ protected static final int KEYSTART = 1; // key offset in acct
+ protected static final int PARTITION = 2; // partition offset in acct
+ protected static final int VALLEN = 3; // length of value
protected static final int NMETA = 4; // num meta ints
protected static final int METASIZE = NMETA * 4; // size in bytes
@@ -293,10 +293,10 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
distanceTo(keystart, valend, bufvoid));
// write accounting info
- kvmeta.put(kvindex + INDEX, kvindex);
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
+ kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
@@ -362,17 +362,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
/**
- * For the given meta position, return the dereferenced position in the
- * integer array. Each meta block contains several integers describing
- * record data in its serialized form, but the INDEX is not necessarily
- * related to the proximate metadata. The index value at the referenced int
- * position is the start offset of the associated metadata block. So the
- * metadata INDEX at metapos may point to the metadata described by the
- * metadata block at metapos + k, which contains information about that
- * serialized record.
+ * For the given meta position, return the offset into the int-sized
+ * kvmeta buffer.
*/
int offsetFor(int metapos) {
- return kvmeta.get((metapos % maxRec) * NMETA + INDEX);
+ return (metapos % maxRec) * NMETA;
+
}
/**
@@ -398,16 +393,17 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
}
+ final byte META_BUFFER_TMP[] = new byte[METASIZE];
/**
- * Swap logical indices st i, j MOD offset capacity.
+ * Swap metadata for items i,j
* @see IndexedSortable#swap
*/
public void swap(final int mi, final int mj) {
- final int kvi = (mi % maxRec) * NMETA + INDEX;
- final int kvj = (mj % maxRec) * NMETA + INDEX;
- int tmp = kvmeta.get(kvi);
- kvmeta.put(kvi, kvmeta.get(kvj));
- kvmeta.put(kvj, tmp);
+ int iOff = (mi % maxRec) * METASIZE;
+ int jOff = (mj % maxRec) * METASIZE;
+ System.arraycopy(kvbuffer, iOff, META_BUFFER_TMP, 0, METASIZE);
+ System.arraycopy(kvbuffer, jOff, kvbuffer, iOff, METASIZE);
+ System.arraycopy(META_BUFFER_TMP, 0, kvbuffer, jOff, METASIZE);
}
/**
@@ -753,11 +749,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
while (spindex < mend &&
kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
final int kvoff = offsetFor(spindex);
- key.reset(
- kvbuffer,
- kvmeta.get(kvoff + KEYSTART),
- (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))
- );
+ int keystart = kvmeta.get(kvoff + KEYSTART);
+ int valstart = kvmeta.get(kvoff + VALSTART);
+ key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
@@ -901,15 +895,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
protected int getInMemVBytesLength(int kvoff) {
// get the keystart for the next serialized value to be the end
// of this value. If this is the last value in the buffer, use bufend
- final int nextindex = kvoff == kvend
- ? bufend
- : kvmeta.get(
- (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
- // calculate the length of the value
- int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
- ? nextindex - kvmeta.get(kvoff + VALSTART)
- : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
- return vallen;
+ final int vallen = kvmeta.get(kvoff + VALLEN);
+ assert vallen >= 0;
+ return vallen;
}
/**
[2/2] git commit: TEZ-951. Port MAPREDUCE-5028. Contributed by Gopal
V and Siddharth Seth.
Posted by ss...@apache.org.
TEZ-951. Port MAPREDUCE-5028. Contributed by Gopal V and Siddharth Seth.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a3a9ceaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a3a9ceaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a3a9ceaa
Branch: refs/heads/master
Commit: a3a9ceaad6d94200889f30a8d41cd1d42689797f
Parents: be0b47a
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Mar 24 11:40:31 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Mar 24 11:40:31 2014 -0700
----------------------------------------------------------------------
.../library/common/sort/impl/dflt/DefaultSorter.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a3a9ceaa/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 95256d0..5264554 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -298,7 +298,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
- kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
+ kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity());
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
@@ -316,8 +316,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
equator = pos;
// set index prior to first entry, aligned at meta boundary
final int aligned = pos - (pos % METASIZE);
- kvindex =
- ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+ // Cast one of the operands to long to avoid integer overflow
+ kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
if (LOG.isInfoEnabled()) {
LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
"(" + (kvindex * 4) + ")");
@@ -334,8 +334,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
bufstart = bufend = e;
final int aligned = e - (e % METASIZE);
// set start/end to point to first meta record
- kvstart = kvend =
- ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+ // Cast one of the operands to long to avoid integer overflow
+ kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
if (LOG.isInfoEnabled()) {
LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
(kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");