You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/03/13 03:56:29 UTC
tez git commit: TEZ-3650. Improve performance of
FetchStatsLogger#logIndividualFetchComplete (jeagles)
Repository: tez
Updated Branches:
refs/heads/master c11810440 -> a9af6cfcc
TEZ-3650. Improve performance of FetchStatsLogger#logIndividualFetchComplete (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a9af6cfc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a9af6cfc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a9af6cfc
Branch: refs/heads/master
Commit: a9af6cfccb8069afb01cbac3d373ffa0e2ecba93
Parents: c118104
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Sun Mar 12 22:56:02 2017 -0500
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Sun Mar 12 22:56:02 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/util/FastNumberFormat.java | 55 +++++++++++
.../org/apache/tez/util/TestNumberFormat.java | 39 ++++++++
.../runtime/library/common/shuffle/Fetcher.java | 4 +-
.../library/common/shuffle/ShuffleUtils.java | 85 +++++++++++------
.../orderedgrouped/FetcherOrderedGrouped.java | 4 +-
.../common/shuffle/TestShuffleUtils.java | 6 +-
.../orderedgrouped/TestMergeManager.java | 98 ++++++++++----------
8 files changed, 209 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 48ccb54..54b17b1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3650. Improve performance of FetchStatsLogger#logIndividualFetchComplete
TEZ-3655. Specify netty version instead of inheriting from hadoop dependency.
TEZ-3253. Remove special handling for last app attempt.
TEZ-3648. IFile.Write#close has an extra output stream flush
http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java b/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java
new file mode 100644
index 0000000..f22fc64
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/util/FastNumberFormat.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.util;
+
+public class FastNumberFormat {
+
+ public static final int MAX_COUNT = 19;
+ private final char[] digits = new char[MAX_COUNT];
+ private int minimumIntegerDigits;
+
+ public static FastNumberFormat getInstance() {
+ return new FastNumberFormat();
+ }
+
+ public void setMinimumIntegerDigits(int minimumIntegerDigits) {
+ this.minimumIntegerDigits = minimumIntegerDigits;
+ }
+
+ public StringBuilder format(long source, StringBuilder sb) {
+ int left = MAX_COUNT;
+ if (source < 0) {
+ sb.append('-');
+ source = - source;
+ }
+ while (source > 0) {
+ digits[--left] = (char)('0' + (source % 10));
+ source /= 10;
+ }
+ while (MAX_COUNT - left < minimumIntegerDigits) {
+ digits[--left] = '0';
+ }
+ sb.append(digits, left, MAX_COUNT - left);
+ return sb;
+ }
+
+ public String format(long source) {
+ return format(source, new StringBuilder()).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java b/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java
new file mode 100644
index 0000000..c2f1185
--- /dev/null
+++ b/tez-common/src/test/java/org/apache/tez/util/TestNumberFormat.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.text.NumberFormat;
+
+public class TestNumberFormat {
+
+ @Test(timeout = 1000)
+ public void testLongWithPadding() throws Exception {
+ FastNumberFormat fastNumberFormat = FastNumberFormat.getInstance();
+ fastNumberFormat.setMinimumIntegerDigits(6);
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setGroupingUsed(false);
+ numberFormat.setMinimumIntegerDigits(6);
+ long[] testLongs = {1, 23, 456, 7890, 12345, 678901, 2345689, 0, -0, -1, -23, -456, -7890, -12345, -678901, -2345689};
+ for (long l: testLongs) {
+ Assert.assertEquals("Number formats should be equal", numberFormat.format(l), fastNumberFormat.format(l));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 6cbff94..9d1f42a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -814,11 +814,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
input, (int) decompressedLength, (int) compressedLength, codec,
ifileReadAhead, ifileReadAheadLength, LOG,
- fetchedInput.getInputAttemptIdentifier().toString());
+ fetchedInput.getInputAttemptIdentifier());
} else if (fetchedInput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
(host +":" +port), input, compressedLength, decompressedLength, LOG,
- fetchedInput.getInputAttemptIdentifier().toString(),
+ fetchedInput.getInputAttemptIdentifier(),
ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
} else {
throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " +
http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 82e844d..caddbc8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -48,6 +48,7 @@ import org.apache.tez.http.SSLFactory;
import org.apache.tez.http.async.netty.AsyncHttpConnection;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
+import org.apache.tez.util.FastNumberFormat;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,6 +89,15 @@ public class ShuffleUtils {
return new DecimalFormat("0.00");
}
};
+ static final ThreadLocal<FastNumberFormat> MBPS_FAST_FORMAT =
+ new ThreadLocal<FastNumberFormat>() {
+ @Override
+ protected FastNumberFormat initialValue() {
+ FastNumberFormat fmt = FastNumberFormat.getInstance();
+ fmt.setMinimumIntegerDigits(2);
+ return fmt;
+ }
+ };
public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
throws IOException {
@@ -119,7 +129,7 @@ public class ShuffleUtils {
public static void shuffleToMemory(byte[] shuffleData,
InputStream input, int decompressedLength, int compressedLength,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
- Logger LOG, String identifier) throws IOException {
+ Logger LOG, InputAttemptIdentifier identifier) throws IOException {
try {
IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec,
ifileReadAhead, ifileReadAheadLength);
@@ -145,7 +155,7 @@ public class ShuffleUtils {
}
public static void shuffleToDisk(OutputStream output, String hostIdentifier,
- InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier,
+ InputStream input, long compressedLength, long decompressedLength, Logger LOG, InputAttemptIdentifier identifier,
boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException {
// Copy data to local-disk
long bytesLeft = compressedLength;
@@ -530,6 +540,20 @@ public class ShuffleUtils {
this.aggregateLogger = aggregateLogger;
}
+
+ private static StringBuilder toShortString(InputAttemptIdentifier inputAttemptIdentifier, StringBuilder sb) {
+ sb.append("{");
+ sb.append(inputAttemptIdentifier.getInputIdentifier());
+ sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
+ sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
+ if (inputAttemptIdentifier.getFetchTypeInfo()
+ != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) {
+ sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal());
+ sb.append(", ").append(inputAttemptIdentifier.getSpillEventId());
+ }
+ sb.append("}");
+ return sb;
+ }
/**
* Log individual fetch complete event.
* This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining
@@ -545,19 +569,37 @@ public class ShuffleUtils {
*/
public void logIndividualFetchComplete(long millis, long bytesCompressed,
long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) {
- double rate = 0;
- if (millis != 0) {
- rate = bytesCompressed / ((double) millis / 1000);
- rate = rate / (1024 * 1024);
- }
+
if (activeLogger.isInfoEnabled()) {
- activeLogger.info(
- "Completed fetch for attempt: "
- + toShortString(srcAttemptIdentifier)
- +" to " + outputType +
- ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed +
- ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
- MBPS_FORMAT.get().format(rate) + " MB/s");
+ long wholeMBs = 0;
+ long partialMBs = 0;
+ if (millis != 0) {
+ // fast math is done using integer math to avoid double to string conversion
+ // calculate B/s * 100 to preserve MBs precision to two decimal places
+ // multiply numerator by 100000 (2^5 * 5^5) and divide denominator by MB (2^20)
+ // simply fraction to protect ourselves from overflow by factoring out 2^5
+ wholeMBs = (bytesCompressed * 3125) / (millis * 32768);
+ partialMBs = wholeMBs % 100;
+ wholeMBs /= 100;
+ }
+ StringBuilder sb = new StringBuilder("Completed fetch for attempt: ");
+ toShortString(srcAttemptIdentifier, sb);
+ sb.append(" to ");
+ sb.append(outputType);
+ sb.append(", csize=");
+ sb.append(bytesCompressed);
+ sb.append(", dsize=");
+ sb.append(bytesDecompressed);
+ sb.append(", EndTime=");
+ sb.append(System.currentTimeMillis());
+ sb.append(", TimeTaken=");
+ sb.append(millis);
+ sb.append(", Rate=");
+ sb.append(wholeMBs);
+ sb.append(".");
+ MBPS_FAST_FORMAT.get().format(partialMBs, sb);
+ sb.append(" MB/s");
+ activeLogger.info(sb.toString());
} else {
long currentCount, currentCompressedSize, currentDecompressedSize, currentTotalTime;
synchronized (this) {
@@ -583,21 +625,6 @@ public class ShuffleUtils {
}
}
- private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) {
- StringBuilder sb = new StringBuilder();
- sb.append("{");
- sb.append(inputAttemptIdentifier.getInputIdentifier());
- sb.append(", ").append(inputAttemptIdentifier.getAttemptNumber());
- sb.append(", ").append(inputAttemptIdentifier.getPathComponent());
- if (inputAttemptIdentifier.getFetchTypeInfo()
- != InputAttemptIdentifier.SPILL_INFO.FINAL_MERGE_ENABLED) {
- sb.append(", ").append(inputAttemptIdentifier.getFetchTypeInfo().ordinal());
- sb.append(", ").append(inputAttemptIdentifier.getSpillEventId());
- }
- sb.append("}");
- return sb.toString();
- }
-
/**
* Build {@link org.apache.tez.http.HttpConnectionParams} from configuration
*
http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index bcb75d2..58ca1e2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -504,11 +504,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
if (mapOutput.getType() == Type.MEMORY) {
ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
(int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
- ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
+ ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier());
} else if (mapOutput.getType() == Type.DISK) {
ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
input, compressedLength, decompressedLength, LOG,
- mapOutput.getAttemptIdentifier().toString(),
+ mapOutput.getAttemptIdentifier(),
ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
} else {
throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index f21da7c..b1ce716 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -284,7 +284,7 @@ public class TestShuffleUtils {
byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1};
try {
ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header),
- 1024, 128, mockCodec, false, 0, mock(Logger.class), "identifier");
+ 1024, 128, mockCodec, false, 0, mock(Logger.class), null);
Assert.fail("shuffle was supposed to throw!");
} catch (IOException e) {
Assert.assertTrue(e.getCause() instanceof InternalError);
@@ -301,14 +301,14 @@ public class TestShuffleUtils {
ByteArrayInputStream in = new ByteArrayInputStream(bogusData);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ShuffleUtils.shuffleToDisk(baos, "somehost", in,
- bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, false);
+ bogusData.length, 2000, mock(Logger.class), null, false, 0, false);
Assert.assertArrayEquals(bogusData, baos.toByteArray());
// verify sending same stream of zeroes with validation generates an exception
in.reset();
try {
ShuffleUtils.shuffleToDisk(mock(OutputStream.class), "somehost", in,
- bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, true);
+ bogusData.length, 2000, mock(Logger.class), null, false, 0, true);
Assert.fail("shuffle was supposed to throw!");
} catch (IOException e) {
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a9af6cfc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 9209ff4..a812728 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -235,8 +235,8 @@ public class TestMergeManager {
assertEquals(0, mergeManager.getUsedMemory());
assertEquals(0, mergeManager.getCommitMemory());
- byte[] data1 = generateData(conf, 10);
- byte[] data2 = generateData(conf, 20);
+ byte[] data1 = generateData(conf, 10, null);
+ byte[] data2 = generateData(conf, 20, null);
MapOutput firstMapOutput = mergeManager.reserve(null, data1.length, data1.length, 0);
MapOutput secondMapOutput = mergeManager.reserve(null, data2.length, data2.length, 0);
assertEquals(MapOutput.Type.MEMORY, firstMapOutput.getType());
@@ -294,15 +294,19 @@ public class TestMergeManager {
* - After 3 segment commits, it would trigger mem-to-mem merge.
* - All of them can be merged in memory.
*/
- byte[] data1 = generateDataBySize(conf, 10);
- byte[] data2 = generateDataBySize(conf, 20);
- byte[] data3 = generateDataBySize(conf, 200);
- byte[] data4 = generateDataBySize(conf, 20000);
-
- MapOutput mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
- MapOutput mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
- MapOutput mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
- MapOutput mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+ InputAttemptIdentifier inputAttemptIdentifier1 = new InputAttemptIdentifier(0,0);
+ InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1,0);
+ InputAttemptIdentifier inputAttemptIdentifier3 = new InputAttemptIdentifier(2,0);
+ InputAttemptIdentifier inputAttemptIdentifier4 = new InputAttemptIdentifier(3,0);
+ byte[] data1 = generateDataBySize(conf, 10, inputAttemptIdentifier1);
+ byte[] data2 = generateDataBySize(conf, 20, inputAttemptIdentifier2);
+ byte[] data3 = generateDataBySize(conf, 200, inputAttemptIdentifier3);
+ byte[] data4 = generateDataBySize(conf, 20000, inputAttemptIdentifier4);
+
+ MapOutput mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+ MapOutput mo2 = mergeManager.reserve(inputAttemptIdentifier1, data2.length, data2.length, 0);
+ MapOutput mo3 = mergeManager.reserve(inputAttemptIdentifier1, data3.length, data3.length, 0);
+ MapOutput mo4 = mergeManager.reserve(inputAttemptIdentifier1, data4.length, data4.length, 0);
assertEquals(MapOutput.Type.MEMORY, mo1.getType());
assertEquals(MapOutput.Type.MEMORY, mo2.getType());
@@ -351,15 +355,15 @@ public class TestMergeManager {
mergeManager.configureAndStart();
//Single shuffle limit is 25% of 2000000
- data1 = generateDataBySize(conf, 10);
- data2 = generateDataBySize(conf, 400000);
- data3 = generateDataBySize(conf, 400000);
- data4 = generateDataBySize(conf, 400000);
+ data1 = generateDataBySize(conf, 10, inputAttemptIdentifier1);
+ data2 = generateDataBySize(conf, 400000, inputAttemptIdentifier2);
+ data3 = generateDataBySize(conf, 400000, inputAttemptIdentifier3);
+ data4 = generateDataBySize(conf, 400000, inputAttemptIdentifier4);
- mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
- mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
- mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
- mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+ mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+ mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0);
+ mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0);
+ mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0);
assertEquals(MapOutput.Type.MEMORY, mo1.getType());
assertEquals(MapOutput.Type.MEMORY, mo2.getType());
@@ -409,15 +413,15 @@ public class TestMergeManager {
mergeManager.configureAndStart();
//Single shuffle limit is 25% of 2000000
- data1 = generateDataBySize(conf, 400000);
- data2 = generateDataBySize(conf, 400000);
- data3 = generateDataBySize(conf, 400000);
- data4 = generateDataBySize(conf, 400000);
+ data1 = generateDataBySize(conf, 400000, inputAttemptIdentifier1);
+ data2 = generateDataBySize(conf, 400000, inputAttemptIdentifier2);
+ data3 = generateDataBySize(conf, 400000, inputAttemptIdentifier3);
+ data4 = generateDataBySize(conf, 400000, inputAttemptIdentifier4);
- mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
- mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
- mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
- mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+ mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+ mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0);
+ mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0);
+ mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0);
assertEquals(MapOutput.Type.MEMORY, mo1.getType());
assertEquals(MapOutput.Type.MEMORY, mo2.getType());
@@ -465,15 +469,15 @@ public class TestMergeManager {
mergeManager.configureAndStart();
//Single shuffle limit is 25% of 2000000
- data1 = generateDataBySize(conf, 490000);
- data2 = generateDataBySize(conf, 490000);
- data3 = generateDataBySize(conf, 490000);
- data4 = generateDataBySize(conf, 230000);
+ data1 = generateDataBySize(conf, 490000, inputAttemptIdentifier1);
+ data2 = generateDataBySize(conf, 490000, inputAttemptIdentifier2);
+ data3 = generateDataBySize(conf, 490000, inputAttemptIdentifier3);
+ data4 = generateDataBySize(conf, 230000, inputAttemptIdentifier4);
- mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
- mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
- mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
- mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+ mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+ mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0);
+ mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0);
+ mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0);
assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000));
@@ -520,15 +524,15 @@ public class TestMergeManager {
mergeManager.configureAndStart();
//Single shuffle limit is 25% of 2000000
- data1 = generateDataBySize(conf, 490000);
- data2 = generateDataBySize(conf, 490000);
- data3 = generateDataBySize(conf, 490000);
- data4 = generateDataBySize(conf, 230000);
+ data1 = generateDataBySize(conf, 490000, inputAttemptIdentifier1);
+ data2 = generateDataBySize(conf, 490000, inputAttemptIdentifier2);
+ data3 = generateDataBySize(conf, 490000, inputAttemptIdentifier3);
+ data4 = generateDataBySize(conf, 230000, inputAttemptIdentifier4);
- mo1 = mergeManager.reserve(new InputAttemptIdentifier(0,0), data1.length, data1.length, 0);
- mo2 = mergeManager.reserve(new InputAttemptIdentifier(1,0), data2.length, data2.length, 0);
- mo3 = mergeManager.reserve(new InputAttemptIdentifier(2,0), data3.length, data3.length, 0);
- mo4 = mergeManager.reserve(new InputAttemptIdentifier(3,0), data4.length, data4.length, 0);
+ mo1 = mergeManager.reserve(inputAttemptIdentifier1, data1.length, data1.length, 0);
+ mo2 = mergeManager.reserve(inputAttemptIdentifier2, data2.length, data2.length, 0);
+ mo3 = mergeManager.reserve(inputAttemptIdentifier3, data3.length, data3.length, 0);
+ mo4 = mergeManager.reserve(inputAttemptIdentifier4, data4.length, data4.length, 0);
assertTrue(mergeManager.getUsedMemory() >= (490000 + 490000 + 490000 + 23000));
@@ -566,7 +570,7 @@ public class TestMergeManager {
Assert.assertFalse(mergeManager.isMergeComplete());
}
- private byte[] generateDataBySize(Configuration conf, int rawLen) throws IOException {
+ private byte[] generateDataBySize(Configuration conf, int rawLen, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
IFile.Writer writer =
@@ -584,11 +588,11 @@ public class TestMergeManager {
int rawLength = (int)writer.getRawLength();
byte[] data = new byte[rawLength];
ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()),
- rawLength, compressedLength, null, false, 0, LOG, "sometask");
+ rawLength, compressedLength, null, false, 0, LOG, inputAttemptIdentifier);
return data;
}
- private byte[] generateData(Configuration conf, int numEntries) throws IOException {
+ private byte[] generateData(Configuration conf, int numEntries, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
IFile.Writer writer =
@@ -601,7 +605,7 @@ public class TestMergeManager {
int rawLength = (int)writer.getRawLength();
byte[] data = new byte[rawLength];
ShuffleUtils.shuffleToMemory(data, new ByteArrayInputStream(baos.toByteArray()),
- rawLength, compressedLength, null, false, 0, LOG, "sometask");
+ rawLength, compressedLength, null, false, 0, LOG, inputAttemptIdentifier);
return data;
}