You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/06 07:29:52 UTC
git commit: TEZ-597. Broadcast Input should use the compressed size
when creating an On-Disk IFile reader. (sseth)
Updated Branches:
refs/heads/master 6fddbd01b -> 98ca0091d
TEZ-597. Broadcast Input should use the compressed size when creating an
On-Disk IFile reader. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/98ca0091
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/98ca0091
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/98ca0091
Branch: refs/heads/master
Commit: 98ca0091d9c119ea8a3f2cf21ab4db0de2a77c9b
Parents: 6fddbd0
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Nov 5 22:29:35 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Nov 5 22:29:35 2013 -0800
----------------------------------------------------------------------
.../tez/mapreduce/hadoop/MRJobConfig.java | 26 ------------
.../broadcast/input/BroadcastInputManager.java | 19 +++++----
.../broadcast/input/BroadcastKVReader.java | 4 +-
.../BroadcastShuffleInputEventHandler.java | 2 +-
.../input/BroadcastShuffleManager.java | 12 +++++-
.../task/local/output/TezTaskOutputFiles.java | 1 -
.../library/output/OnFileUnorderedKVOutput.java | 2 +-
.../shuffle/common/DiskFetchedInput.java | 29 +++----------
.../library/shuffle/common/FetchedInput.java | 16 +++++---
.../shuffle/common/FetchedInputAllocator.java | 2 +-
.../runtime/library/shuffle/common/Fetcher.java | 43 ++++++++++++++------
.../shuffle/common/MemoryFetchedInput.java | 11 ++---
.../input/TestBroadcastInputManager.java | 11 ++---
13 files changed, 85 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 940b1e0..0974080 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -652,30 +652,4 @@ public interface MRJobConfig {
public static final String MR_TEZ_SPLITS_VIA_EVENTS = MR_TEZ_PREFIX + "splits.via.events";
public static final boolean MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT = true;
- // Stage specific properties
- // Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>
- // where suffix is one of MRR_INTERMEDIATE_STAGE_* fields defined below.
-// public static final String MRR_INTERMEDIATE_STAGE_TASKS = "tasks";
-// public static final String MRR_INTERMEDIATE_STAGE_CLASS = "class";
-// public static final String
-// MRR_INTERMEDIATE_STAGE_PARTITIONER_CLASS = "partitioner.class";
-// public static final String
-// MRR_INTERMEDIATE_STAGE_COMBINER_CLASS = "combiner.class";
-// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_COMPRESS =
-// "output.compress";
-// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_COMPRESSION_CODEC =
-// "output.compression.codec";
-// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_KEY_CLASS =
-// "key.class";
-// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_KEY_COMPARATOR_CLASS
-// = "key.comparator.class";
-// public static final String MRR_INTERMEDIATE_STAGE_OUTPUT_VALUE_CLASS =
-// "value.class";
-// public static final String MRR_INTERMEDIATE_STAGE_SPECULATE =
-// "speculate";
-// public static final String MRR_INTERMEDIATE_STAGE_MEMORY_MB =
-// "memory.mb";
-// public static final String MRR_INTERMEDIATE_STAGE_CHILD_JAVA_OPTS =
-// "child.java.opts";
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index 512db50..456b8b6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -89,16 +89,17 @@ public class BroadcastInputManager implements FetchedInputAllocator,
}
@Override
- public synchronized FetchedInput allocate(long size,
+ public synchronized FetchedInput allocate(long actualSize, long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
- if (size > maxSingleShuffleLimit
- || this.usedMemory + size > this.memoryLimit) {
- return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
- localDirAllocator, fileNameAllocator);
+ if (actualSize > maxSingleShuffleLimit
+ || this.usedMemory + actualSize > this.memoryLimit) {
+ return new DiskFetchedInput(actualSize, compressedSize,
+ inputAttemptIdentifier, this, conf, localDirAllocator,
+ fileNameAllocator);
} else {
- this.usedMemory += size;
- LOG.info("Used memory after allocating " + size + " : " + usedMemory);
- return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
+ this.usedMemory += actualSize;
+ LOG.info("Used memory after allocating " + actualSize + " : " + usedMemory);
+ return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
}
}
@@ -130,7 +131,7 @@ public class BroadcastInputManager implements FetchedInputAllocator,
case DISK:
break;
case MEMORY:
- unreserve(fetchedInput.getSize());
+ unreserve(fetchedInput.getActualSize());
break;
default:
throw new TezUncheckedException("InputType: " + fetchedInput.getType()
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 0b12a53..da74ebd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -178,10 +178,10 @@ public class BroadcastKVReader<K, V> implements KeyValueReader {
MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
- mfi.getBytes(), 0, (int) mfi.getSize());
+ mfi.getBytes(), 0, (int) mfi.getActualSize());
} else {
return new IFile.Reader(fetchedInput.getInputStream(),
- fetchedInput.getSize(), codec, null, ifileReadAhead,
+ fetchedInput.getCompressedSize(), codec, null, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index f9976d6..a7a12ef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -101,7 +101,7 @@ public class BroadcastShuffleInputEventHandler {
shufflePayload.getPathComponent());
if (shufflePayload.hasData()) {
DataProto dataProto = shufflePayload.getData();
- FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), srcAttemptIdentifier);
+ FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
moveDataToFetchedInput(dataProto, fetchedInput);
shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 7246359..a4acff6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -465,7 +465,14 @@ public class BroadcastShuffleManager implements FetcherCallback {
InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
// TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
// For now, reporting immediately.
- LOG.info("Fetch failed for src: " + srcAttemptIdentifier + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed);
+ LOG.info("Fetch failed for src: " + srcAttemptIdentifier
+ + "InputIdentifier: " + srcAttemptIdentifier + ", connectFailed: "
+ + connectFailed);
+ if (srcAttemptIdentifier == null) {
+ String message = "Received fetchFailure for an unknown src (null)";
+ LOG.fatal(message);
+ inputContext.fatalError(null, message);
+ } else {
InputReadErrorEvent readError = new InputReadErrorEvent(
"Fetch failure while fetching from "
+ TezRuntimeUtils.getTaskAttemptIdentifier(
@@ -478,6 +485,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
failedEvents.add(readError);
inputContext.sendEvents(failedEvents);
+ }
}
/////////////////// End of Methods from FetcherCallbackHandler
@@ -564,7 +572,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
private class NullFetchedInput extends FetchedInput {
public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
- super(Type.MEMORY, -1, inputAttemptIdentifier, null);
+ super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index 847a0bf..5111eef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -105,7 +105,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
* @throws IOException
*/
public Path getOutputFileForWrite() throws IOException {
- // TODO how to write 2 different broadcast outputs?????
Path attemptOutput =
new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 658e993..5c49da3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -101,7 +101,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
- LOG.info("Closing KVOutput: RawLegnth: " + this.kvWriter.getRawLength()
+ LOG.info("Closing KVOutput: RawLength: " + this.kvWriter.getRawLength()
+ ", CompressedLength: " + this.kvWriter.getCompressedLength());
if (dataViaEventsEnabled && outputGenerated && this.kvWriter.getCompressedLength() <= dataViaEventsMaxSize) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
index 3cdf20e..9aeb65d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -28,10 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import com.google.common.base.Preconditions;
@@ -43,25 +40,17 @@ public class DiskFetchedInput extends FetchedInput {
private final FileSystem localFS;
private final Path tmpOutputPath;
private final Path outputPath;
-
- private static final long checkSumSize;
- static {
- DataChecksum sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
- Integer.MAX_VALUE);
- checkSumSize = sum.getChecksumSize();
- }
-
- public DiskFetchedInput(long size,
+ public DiskFetchedInput(long actualSize, long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier,
FetchedInputCallback callbackHandler, Configuration conf,
LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
throws IOException {
- super(Type.DISK, size, inputAttemptIdentifier, callbackHandler);
+ super(Type.DISK, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
this.localFS = FileSystem.getLocal(conf);
this.outputPath = filenameAllocator.getInputFileForWrite(
- this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
+ this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), actualSize);
this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
}
@@ -69,13 +58,6 @@ public class DiskFetchedInput extends FetchedInput {
public OutputStream getOutputStream() throws IOException {
return localFS.create(tmpOutputPath);
}
-
- // Assumes that the file written to disk is an IFile that has a checksum
- // at the end. The size in super is the real data size.
- @Override
- public long getSize() {
- return super.getSize() + checkSumSize;
- }
@Override
public InputStream getInputStream() throws IOException {
@@ -123,7 +105,8 @@ public class DiskFetchedInput extends FetchedInput {
@Override
public String toString() {
return "DiskFetchedInput [outputPath=" + outputPath
- + ", inputAttemptIdentifier=" + inputAttemptIdentifier + ", size="
- + size + ", type=" + type + ", id=" + id + ", state=" + state + "]";
+ + ", inputAttemptIdentifier=" + inputAttemptIdentifier
+ + ", actualSize=" + actualSize + ",compressedSize=" + compressedSize
+ + ", type=" + type + ", id=" + id + ", state=" + state + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
index 8f3c407..0bb765d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
@@ -42,17 +42,19 @@ public abstract class FetchedInput {
private static AtomicInteger ID_GEN = new AtomicInteger(0);
protected InputAttemptIdentifier inputAttemptIdentifier;
- protected final long size;
+ protected final long actualSize;
+ protected final long compressedSize;
protected final Type type;
protected final FetchedInputCallback callback;
protected final int id;
protected State state;
- public FetchedInput(Type type, long size,
+ public FetchedInput(Type type, long actualSize, long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier,
FetchedInputCallback callbackHandler) {
this.type = type;
- this.size = size;
+ this.actualSize = actualSize;
+ this.compressedSize = compressedSize;
this.inputAttemptIdentifier = inputAttemptIdentifier;
this.callback = callbackHandler;
this.id = ID_GEN.getAndIncrement();
@@ -63,8 +65,12 @@ public abstract class FetchedInput {
return this.type;
}
- public long getSize() {
- return this.size;
+ public long getActualSize() {
+ return this.actualSize;
+ }
+
+ public long getCompressedSize() {
+ return this.compressedSize;
}
public InputAttemptIdentifier getInputAttemptIdentifier() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
index 1d60b68..1707ab7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
@@ -24,7 +24,7 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
public interface FetchedInputAllocator {
- public FetchedInput allocate(long size,
+ public FetchedInput allocate(long actualSize, long compresedSize,
InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index 96f1caf..70059b0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -27,11 +27,10 @@ import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
@@ -92,7 +91,7 @@ public class Fetcher implements Callable<FetchResult> {
// Maps from the pathComponents (unique per srcTaskId) to the specific taskId
private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
- private Set<InputAttemptIdentifier> remaining;
+ private LinkedHashSet<InputAttemptIdentifier> remaining;
private URL url;
private String encHash;
@@ -140,7 +139,7 @@ public class Fetcher implements Callable<FetchResult> {
pathToAttemptMap.put(in.getPathComponent(), in);
}
- remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
+ remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
HttpURLConnection connection;
try {
@@ -217,10 +216,11 @@ public class Fetcher implements Callable<FetchResult> {
long startTime = System.currentTimeMillis();
int responsePartition = -1;
// Read the shuffle header
+ String pathComponent = null;
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
- String pathComponent = header.getMapId();
+ pathComponent = header.getMapId();
srcAttemptId = pathToAttemptMap.get(pathComponent);
compressedLength = header.getCompressedLength();
@@ -235,7 +235,12 @@ public class Fetcher implements Callable<FetchResult> {
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength,
- responsePartition, srcAttemptId)) {
+ responsePartition, srcAttemptId, pathComponent)) {
+ if (srcAttemptId == null) {
+ LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
+ srcAttemptId = getNextRemainingAttempt();
+ }
+ assert(srcAttemptId != null);
return new InputAttemptIdentifier[] { srcAttemptId };
}
@@ -245,7 +250,7 @@ public class Fetcher implements Callable<FetchResult> {
}
// Get the location for the map output - either in-memory or on-disk
- fetchedInput = inputManager.allocate(decompressedLength, srcAttemptId);
+ fetchedInput = inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
// TODO NEWTEZ No concept of WAIT at the moment.
// // Check if we can shuffle *now* ...
@@ -317,18 +322,22 @@ public class Fetcher implements Callable<FetchResult> {
* @return true/false, based on if the verification succeeded or not
*/
private boolean verifySanity(long compressedLength, long decompressedLength,
- int fetchPartition, InputAttemptIdentifier srcAttemptId) {
+ int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
if (compressedLength < 0 || decompressedLength < 0) {
// wrongLengthErrs.increment(1);
- LOG.warn(" invalid lengths in input header: id: " + srcAttemptId
+ LOG.warn(" invalid lengths in input header -> headerPathComponent: "
+ + pathComponent + ", nextRemainingSrcAttemptId: "
+ + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+ " len: " + compressedLength + ", decomp len: " + decompressedLength);
return false;
}
if (fetchPartition != this.partition) {
// wrongReduceErrs.increment(1);
- LOG.warn(" data for the wrong reduce map: " + srcAttemptId + " len: "
- + compressedLength + " decomp len: " + decompressedLength
+ LOG.warn(" data for the wrong reduce -> headerPathComponent: "
+ + pathComponent + "nextRemainingSrcAttemptId: "
+ + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId
+ + " len: " + compressedLength + " decomp len: " + decompressedLength
+ " for reduce " + fetchPartition);
return false;
}
@@ -336,11 +345,21 @@ public class Fetcher implements Callable<FetchResult> {
// Sanity check
if (!remaining.contains(srcAttemptId)) {
// wrongMapErrs.increment(1);
- LOG.warn("Invalid input. Received output for " + srcAttemptId);
+ LOG.warn("Invalid input. Received output for headerPathComponent: "
+ + pathComponent + "nextRemainingSrcAttemptId: "
+ + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId);
return false;
}
return true;
}
+
+ private InputAttemptIdentifier getNextRemainingAttempt() {
+ if (remaining.size() > 0) {
+ return remaining.iterator().next();
+ } else {
+ return null;
+ }
+ }
private HttpURLConnection connectToShuffleHandler(String host, int port,
int partition, List<InputAttemptIdentifier> inputs) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
index e34301e..cc559d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
@@ -31,11 +31,11 @@ public class MemoryFetchedInput extends FetchedInput {
private BoundedByteArrayOutputStream byteStream;
- public MemoryFetchedInput(long size,
+ public MemoryFetchedInput(long actualSize, long compressedSize,
InputAttemptIdentifier inputAttemptIdentifier,
FetchedInputCallback callbackHandler) {
- super(Type.MEMORY, size, inputAttemptIdentifier, callbackHandler);
- this.byteStream = new BoundedByteArrayOutputStream((int) size);
+ super(Type.MEMORY, actualSize, compressedSize, inputAttemptIdentifier, callbackHandler);
+ this.byteStream = new BoundedByteArrayOutputStream((int) actualSize);
}
@Override
@@ -83,7 +83,8 @@ public class MemoryFetchedInput extends FetchedInput {
@Override
public String toString() {
return "MemoryFetchedInput [inputAttemptIdentifier="
- + inputAttemptIdentifier + ", size=" + size + ", type=" + type
- + ", id=" + id + ", state=" + state + "]";
+ + inputAttemptIdentifier + ", actualSize=" + actualSize
+ + ", compressedSize=" + compressedSize + ", type=" + type + ", id="
+ + id + ", state=" + state + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/98ca0091/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
index e6603c4..2d663a6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
@@ -53,31 +53,32 @@ public class TestBroadcastInputManager {
BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(), conf);
long requestSize = (long) (0.4f * inMemThreshold);
+ long compressedSize = 1l;
LOG.info("RequestSize: " + requestSize);
- FetchedInput fi1 = inputManager.allocate(requestSize, new InputAttemptIdentifier(1, 1));
+ FetchedInput fi1 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(1, 1));
assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
- FetchedInput fi2 = inputManager.allocate(requestSize, new InputAttemptIdentifier(2, 1));
+ FetchedInput fi2 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(2, 1));
assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
// Over limit by this point. Next reserve should give back a DISK allocation
- FetchedInput fi3 = inputManager.allocate(requestSize, new InputAttemptIdentifier(3, 1));
+ FetchedInput fi3 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(3, 1));
assertEquals(FetchedInput.Type.DISK, fi3.getType());
// Freed one memory allocation. Next should be mem again.
fi1.abort();
fi1.free();
- FetchedInput fi4 = inputManager.allocate(requestSize, new InputAttemptIdentifier(4, 1));
+ FetchedInput fi4 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
// Freed one disk allocation. Next sould be disk again (no mem freed)
fi3.abort();
fi3.free();
- FetchedInput fi5 = inputManager.allocate(requestSize, new InputAttemptIdentifier(4, 1));
+ FetchedInput fi5 = inputManager.allocate(requestSize, compressedSize, new InputAttemptIdentifier(4, 1));
assertEquals(FetchedInput.Type.DISK, fi5.getType());
}